From ea2643970ef61c28c0677ac7a4ac63abf48cfd5f Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Sun, 30 Jan 2022 19:51:18 +0100 Subject: [PATCH] Fix counters in validator monitor totals mode The current counters set gauges etc to the value of the _last_ validator to be processed - as the name of the feature implies, we should be using sums instead. * fix missing beacon state metrics on startup, pre-first-head-selection * fix epoch metrics not being updated on cross-epoch reorg --- .../consensus_object_pools/blockchain_dag.nim | 84 +++-- beacon_chain/validators/validator_monitor.nim | 311 ++++++++++++------ 2 files changed, 244 insertions(+), 151 deletions(-) diff --git a/beacon_chain/consensus_object_pools/blockchain_dag.nim b/beacon_chain/consensus_object_pools/blockchain_dag.nim index e54a78fb70..ba58754df1 100644 --- a/beacon_chain/consensus_object_pools/blockchain_dag.nim +++ b/beacon_chain/consensus_object_pools/blockchain_dag.nim @@ -422,6 +422,38 @@ proc getForkedBlock*( dag.getForkedBlock(blck.bid).expect( "BlockRef block should always load, database corrupt?") +proc updateBeaconMetrics(state: StateData, cache: var StateCache) = + # https://github.com/ethereum/eth2.0-metrics/blob/master/metrics.md#additional-metrics + # both non-negative, so difference can't overflow or underflow int64 + + beacon_head_root.set(state.blck.root.toGaugeValue) + beacon_head_slot.set(state.blck.slot.toGaugeValue) + + withState(state.data): + beacon_pending_deposits.set( + (state.data.eth1_data.deposit_count - + state.data.eth1_deposit_index).toGaugeValue) + beacon_processed_deposits_total.set( + state.data.eth1_deposit_index.toGaugeValue) + + beacon_current_justified_epoch.set( + state.data.current_justified_checkpoint.epoch.toGaugeValue) + beacon_current_justified_root.set( + state.data.current_justified_checkpoint.root.toGaugeValue) + beacon_previous_justified_epoch.set( + state.data.previous_justified_checkpoint.epoch.toGaugeValue) + beacon_previous_justified_root.set( + state.data.previous_justified_checkpoint.root.toGaugeValue) + beacon_finalized_epoch.set( + state.data.finalized_checkpoint.epoch.toGaugeValue) + beacon_finalized_root.set( + state.data.finalized_checkpoint.root.toGaugeValue) + + let active_validators = count_active_validators( + state.data, state.data.slot.epoch, cache).toGaugeValue + beacon_active_validators.set(active_validators) + beacon_current_active_validators.set(active_validators) + proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, validatorMonitor: ref ValidatorMonitor, updateFlags: UpdateFlags, onBlockCb: OnBlockCallback = nil, onHeadCb: OnHeadCallback = nil, @@ -605,7 +637,7 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, genesisRef, tailRef, headRef, tailRoot, headRoot, stateFork, configFork quit 1 - assign(dag.clearanceState, dag.headState) + # db state is likely a epoch boundary state which is what we want for epochs assign(dag.epochRefState, dag.headState) dag.forkDigests = newClone ForkDigests.init( @@ -631,6 +663,11 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, quit 1 + # Clearance most likely happens from head - assign it after rewinding head + assign(dag.clearanceState, dag.headState) + + updateBeaconMetrics(dag.headState, cache) + # The tail block is "implicitly" finalized as it was given either as a # checkpoint block, or is the genesis, thus we use it as a lower bound when # computing the finalized head @@ -658,8 +695,6 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, let stateTick = Moment.now() - dag.clearanceState = dag.headState - # Pruning metadata dag.lastPrunePoint = dag.finalizedHead @@ -1374,6 +1409,8 @@ proc updateHead*( dag.db.putHeadBlock(newHead.root) + updateBeaconMetrics(dag.headState, cache) + withState(dag.headState.data): when stateFork >= BeaconStateFork.Altair: dag.headSyncCommittees = state.data.get_sync_committee_cache(cache) @@ -1433,17 +1470,6 @@ proc updateHead*( prevDepBlock.root) dag.onHeadChanged(data) - # https://github.com/ethereum/eth2.0-metrics/blob/master/metrics.md#additional-metrics - # both non-negative, so difference can't overflow or underflow int64 - beacon_pending_deposits.set( - getStateField(dag.headState.data, eth1_data).deposit_count.toGaugeValue - - getStateField(dag.headState.data, eth1_deposit_index).toGaugeValue) - beacon_processed_deposits_total.set( - getStateField(dag.headState.data, eth1_deposit_index).toGaugeValue) - - beacon_head_root.set newHead.root.toGaugeValue - beacon_head_slot.set newHead.slot.toGaugeValue - withState(dag.headState.data): # Every time the head changes, the "canonical" view of balances and other # state-related metrics change - notify the validator monitor. @@ -1451,31 +1477,6 @@ proc updateHead*( # of such updates happening - at most once per valid block. dag.validatorMonitor[].registerState(state.data) - if lastHead.slot.epoch != newHead.slot.epoch: - # Epoch updated - in theory, these could happen when the wall clock - # changes epoch, even if there is no new block / head, but we'll delay - # updating them until a block confirms the change - beacon_current_justified_epoch.set( - getStateField( - dag.headState.data, current_justified_checkpoint).epoch.toGaugeValue) - beacon_current_justified_root.set( - getStateField( - dag.headState.data, current_justified_checkpoint).root.toGaugeValue) - beacon_previous_justified_epoch.set( - getStateField( - dag.headState.data, previous_justified_checkpoint).epoch.toGaugeValue) - beacon_previous_justified_root.set( - getStateField( - dag.headState.data, previous_justified_checkpoint).root.toGaugeValue) - - let - epochRef = getEpochRef(dag, dag.headState, cache) - number_of_active_validators = - epochRef.shuffled_active_validator_indices.lenu64().toGaugeValue - - beacon_active_validators.set(number_of_active_validators) - beacon_current_active_validators.set(number_of_active_validators) - if finalizedHead != dag.finalizedHead: debug "Reached new finalization checkpoint", head = shortLog(dag.headState.blck), @@ -1504,11 +1505,6 @@ proc updateHead*( dag.updateFinalizedBlocks() - beacon_finalized_epoch.set(getStateField( - dag.headState.data, finalized_checkpoint).epoch.toGaugeValue) - beacon_finalized_root.set(getStateField( - dag.headState.data, finalized_checkpoint).root.toGaugeValue) - # Pruning the block dag is required every time the finalized head changes # in order to clear out blocks that are no longer viable and should # therefore no longer be considered as part of the chain we're following diff --git a/beacon_chain/validators/validator_monitor.nim b/beacon_chain/validators/validator_monitor.nim index 285720d73a..11837a4bdf 100644 --- a/beacon_chain/validators/validator_monitor.nim +++ b/beacon_chain/validators/validator_monitor.nim @@ -83,6 +83,8 @@ declareHistogram validator_monitor_prev_epoch_sync_contribution_min_delay_second "The min delay between when the validator should send the sync contribution and when it was received.", labels = ["validator"] declareGauge validator_monitor_validator_in_current_sync_committee, "Is the validator in the current sync committee (1 for true and 0 for false)", labels = ["validator"] +declareGauge validator_monitor_validator_in_next_sync_committee, + "Is the validator in the next sync committee (1 for true and 0 for false)", labels = ["validator"] declareGauge validator_monitor_validators_total, "Count of validators that are specifically monitored by this beacon node" @@ -125,6 +127,9 @@ declareCounter validator_monitor_proposer_slashing_total, declareCounter validator_monitor_attester_slashing_total, "Number of attester slashings seen", labels = ["src", "validator"] +const + total = "total" # what we use for label when using "totals" mode + type EpochSummary = object ## Similar to the state transition, we collect everything that happens in @@ -187,6 +192,9 @@ type template toGaugeValue(v: bool): int64 = if v: 1 else: 0 +template toGaugeValue(v: TimeDiff): float = + toFloatSeconds(v) + proc update_if_lt[T](current: var Option[T], val: T) = if current.isNone() or val < current.get(): current = some(val) @@ -207,7 +215,7 @@ proc addMonitor*( template metricId: string = mixin self, id - if self.totals: "total" else: id + if self.totals: total else: id proc addAutoMonitor*( self: var ValidatorMonitor, pubkey: ValidatorPubKey, @@ -242,81 +250,110 @@ proc updateEpoch(self: var ValidatorMonitor, epoch: Epoch) = return let - clearMonitor = epoch > self.epoch + 1 + monitorEpoch = self.epoch # index of the EpochSummary that we'll first report, then clear summaryIdx = epoch.summaryIdx - if clearMonitor: - # More than one epoch passed since the last check which makes it difficult - # to report correctly with the amount of data we store - skip this round - # and hope things improve - notice "Resetting validator monitoring", epoch, monitorEpoch = self.epoch - self.epoch = epoch - validator_monitor_validators_total.set(self.monitors.len().int64) - for (_, monitor) in self.monitors.mpairs(): - if clearMonitor: - monitor.summaries = default(type(monitor.summaries)) - continue - - let - id = monitor.id - - let summary = monitor.summaries[summaryIdx] - - validator_monitor_prev_epoch_attestations_total.set( - summary.attestations, [metricId]) - - if summary.attestation_min_delay.isSome(): - validator_monitor_prev_epoch_attestations_min_delay_seconds.observe( - summary.attestation_min_delay.get().toFloatSeconds(), [metricId]) - - validator_monitor_prev_epoch_attestation_aggregate_inclusions.set( - summary.attestation_aggregate_inclusions, [metricId]) - validator_monitor_prev_epoch_attestation_block_inclusions.set( - summary.attestation_block_inclusions, [metricId]) - - if summary.attestation_min_block_inclusion_distance.isSome(): - validator_monitor_prev_epoch_attestation_block_min_inclusion_distance.set( - summary.attestation_min_block_inclusion_distance.get().int64, [metricId]) - - validator_monitor_prev_epoch_sync_committee_messages_total.set( - summary.sync_committee_messages, [metricId]) - - if summary.sync_committee_message_min_delay.isSome(): - validator_monitor_prev_epoch_sync_committee_messages_min_delay_seconds.observe( - summary.sync_committee_message_min_delay.get().toFloatSeconds(), [metricId]) - - validator_monitor_prev_epoch_sync_contribution_inclusions.set( - summary.sync_signature_contribution_inclusions, [metricId]) - validator_monitor_prev_epoch_sync_signature_block_inclusions.set( - summary.sync_signature_block_inclusions, [metricId]) - - validator_monitor_prev_epoch_sync_contributions_total.set( - summary.sync_contributions, [metricId]) - if summary.sync_contribution_min_delay.isSome(): - validator_monitor_prev_epoch_sync_contribution_min_delay_seconds.observe( - summary.sync_contribution_min_delay.get().toFloatSeconds(), [metricId]) - - validator_monitor_prev_epoch_aggregates_total.set( - summary.aggregates, [metricId]) - - if summary.aggregate_min_delay.isSome(): - validator_monitor_prev_epoch_aggregates_min_delay_seconds.observe( - summary.aggregate_min_delay.get().toFloatSeconds(), [metricId]) - - validator_monitor_prev_epoch_exits_total.set( - summary.exits, [metricId]) - - validator_monitor_prev_epoch_proposer_slashings_total.set( - summary.proposer_slashings, [metricId]) + if epoch > monitorEpoch + 1: + # More than one epoch passed since the last check which makes it difficult + # to report correctly with the amount of data we store - skip this round + # and hope things improve + notice "Resetting validator monitoring", epoch, monitorEpoch - validator_monitor_prev_epoch_attester_slashings_total.set( - summary.attester_slashings, [metricId]) + for (_, monitor) in self.monitors.mpairs(): + reset(monitor.summaries) + return - monitor.summaries[summaryIdx] = default(type(monitor.summaries[summaryIdx])) + template setAll(metric, name: untyped) = + if self.totals: + var agg: int64 + for monitor {.inject.} in self.monitors.mvalues: + agg += monitor.summaries[summaryIdx].name + metric.set(agg, [total]) + else: + for monitor {.inject.} in self.monitors.mvalues: + metric.set(monitor.summaries[summaryIdx].name, [monitor.id]) + + template observeAll(metric, name: untyped) = + for monitor {.inject.} in self.monitors.mvalues: + if monitor.summaries[summaryIdx].name.isSome(): + metric.observe( + monitor.summaries[summaryIdx].name.get.toGaugeValue(), + [if self.totals: total else: monitor.id]) + + + setAll( + validator_monitor_prev_epoch_attestations_total, + attestations) + + observeAll( + validator_monitor_prev_epoch_attestations_min_delay_seconds, + attestation_min_delay) + + setAll( + validator_monitor_prev_epoch_attestation_aggregate_inclusions, + attestation_aggregate_inclusions) + + setAll( + validator_monitor_prev_epoch_attestation_block_inclusions, + attestation_block_inclusions) + + setAll( + validator_monitor_prev_epoch_sync_committee_messages_total, + sync_committee_messages) + + observeAll( + validator_monitor_prev_epoch_sync_committee_messages_min_delay_seconds, + sync_committee_message_min_delay) + + setAll( + validator_monitor_prev_epoch_sync_contribution_inclusions, + sync_signature_contribution_inclusions) + setAll( + validator_monitor_prev_epoch_sync_signature_block_inclusions, + sync_signature_block_inclusions) + + setAll( + validator_monitor_prev_epoch_sync_contributions_total, + sync_contributions) + + observeAll( + validator_monitor_prev_epoch_sync_contribution_min_delay_seconds, + sync_contribution_min_delay) + + setAll( + validator_monitor_prev_epoch_aggregates_total, + aggregates) + + observeAll( + validator_monitor_prev_epoch_aggregates_min_delay_seconds, + aggregate_min_delay) + + setAll( + validator_monitor_prev_epoch_exits_total, + exits) + + setAll( + validator_monitor_prev_epoch_proposer_slashings_total, + proposer_slashings) + + setAll( + validator_monitor_prev_epoch_attester_slashings_total, + attester_slashings) + + if not self.totals: + for monitor in self.monitors.mvalues: + if monitor.summaries[summaryIdx]. + attestation_min_block_inclusion_distance.isSome: + validator_monitor_prev_epoch_attestation_block_min_inclusion_distance.set( + monitor.summaries[summaryIdx]. + attestation_min_block_inclusion_distance.get().int64, [monitor.id]) + + for monitor in self.monitors.mvalues: + reset(monitor.summaries[summaryIdx]) func is_active_unslashed_in_previous_epoch(status: RewardStatus): bool = let flags = status.flags @@ -354,6 +391,8 @@ proc registerEpochInfo*( if epoch < 2 or self.monitors.len == 0: return + var in_current_sync_committee, in_next_sync_committee: int64 + withEpochInfo(info): for pubkey, monitor in self.monitors: if monitor.index.isNone: @@ -435,7 +474,8 @@ proc registerEpochInfo*( let current_epoch = epoch - 1 if state.current_sync_committee.pubkeys.data.contains(pubkey): - validator_monitor_validator_in_current_sync_committee.set(1, [metricId]) + if not self.totals: + validator_monitor_validator_in_current_sync_committee.set(1, [metricId]) self.withEpochSummary(monitor[], current_epoch): info "Current epoch sync signatures", @@ -443,11 +483,34 @@ proc registerEpochInfo*( expected = SLOTS_PER_EPOCH, epoch = current_epoch, validator = id + in_current_sync_committee += 1 + + else: + if not self.totals: + validator_monitor_validator_in_current_sync_committee.set(0, [metricId]) + debug "Validator isn't part of the current sync committee", + epoch = current_epoch, + validator = id + + if state.next_sync_committee.pubkeys.data.contains(pubkey): + if not self.totals: + validator_monitor_validator_in_next_sync_committee.set(1, [metricId]) + + self.withEpochSummary(monitor[], current_epoch): + info "Validator in next sync committee", + epoch = current_epoch, + validator = id + in_next_sync_committee += 1 + else: - validator_monitor_validator_in_current_sync_committee.set(0, [metricId]) - debug "Validator isn't part of the current sync committee", - epoch = current_epoch, - validator = id + if not self.totals: + validator_monitor_validator_in_next_sync_committee.set(0, [metricId]) + + if self.totals: + validator_monitor_validator_in_current_sync_committee.set( + in_current_sync_committee, [total]) + validator_monitor_validator_in_next_sync_committee.set( + in_next_sync_committee, [total]) self.updateEpoch(epoch) @@ -467,35 +530,66 @@ proc registerState*(self: var ValidatorMonitor, state: ForkyBeaconState) = current_epoch = state.slot.epoch # Update metrics for monitored validators according to the latest rewards - for (_, monitor) in self.monitors.mpairs(): - if not monitor[].index.isSome(): - continue - let idx = monitor[].index.get() - if state.balances.lenu64 <= idx.uint64: - continue + if self.totals: + var + balance: uint64 + effective_balance: uint64 + slashed: int64 + active: int64 + exited: int64 + withdrawable: int64 + + for monitor in self.monitors.mvalues: + if not monitor[].index.isSome(): + continue - let id = monitor[].id - validator_monitor_balance_gwei.set( - state.balances[idx].toGaugeValue(), [metricId]) - validator_monitor_effective_balance_gwei.set( - state.validators[idx].effective_balance.toGaugeValue(), [metricId]) - validator_monitor_slashed.set( - state.validators[idx].slashed.toGaugeValue(), [metricId]) - validator_monitor_active.set( - is_active_validator(state.validators[idx], current_epoch).toGaugeValue(), [metricId]) - validator_monitor_exited.set( - is_exited_validator(state.validators[idx], current_epoch).toGaugeValue(), [metricId]) - validator_monitor_withdrawable.set( - is_withdrawable_validator(state.validators[idx], current_epoch).toGaugeValue(), [metricId]) - validator_activation_eligibility_epoch.set( - state.validators[idx].activation_eligibility_epoch.toGaugeValue(), [metricId]) - validator_activation_epoch.set( - state.validators[idx].activation_epoch.toGaugeValue(), [metricId]) - validator_exit_epoch.set( - state.validators[idx].exit_epoch.toGaugeValue(), [metricId]) - validator_withdrawable_epoch.set( - state.validators[idx].withdrawable_epoch.toGaugeValue(), [metricId]) + let idx = monitor[].index.get() + if state.balances.lenu64 <= idx.uint64: + continue + balance += state.balances[idx] + effective_balance += state.validators[idx].effective_balance + if state.validators[idx].slashed: slashed += 1 + if is_active_validator(state.validators[idx], current_epoch): active += 1 + if is_exited_validator(state.validators[idx], current_epoch): exited += 1 + if is_withdrawable_validator(state.validators[idx], current_epoch): withdrawable += 1 + validator_monitor_balance_gwei.set(balance.toGaugeValue(), [total]) + validator_monitor_effective_balance_gwei.set(effective_balance.toGaugeValue(), [total]) + validator_monitor_slashed.set(slashed, [total]) + validator_monitor_active.set(active, [total]) + validator_monitor_exited.set(exited, [total]) + validator_monitor_withdrawable.set(withdrawable, [total]) + + else: + for monitor in self.monitors.mvalues(): + if not monitor[].index.isSome(): + continue + + let idx = monitor[].index.get() + if state.balances.lenu64 <= idx.uint64: + continue + + let id = monitor[].id + validator_monitor_balance_gwei.set( + state.balances[idx].toGaugeValue(), [id]) + validator_monitor_effective_balance_gwei.set( + state.validators[idx].effective_balance.toGaugeValue(), [id]) + validator_monitor_slashed.set( + state.validators[idx].slashed.toGaugeValue(), [id]) + validator_monitor_active.set( + is_active_validator(state.validators[idx], current_epoch).toGaugeValue(), [id]) + validator_monitor_exited.set( + is_exited_validator(state.validators[idx], current_epoch).toGaugeValue(), [id]) + validator_monitor_withdrawable.set( + is_withdrawable_validator(state.validators[idx], current_epoch).toGaugeValue(), [id]) + validator_activation_eligibility_epoch.set( + state.validators[idx].activation_eligibility_epoch.toGaugeValue(), [id]) + validator_activation_epoch.set( + state.validators[idx].activation_epoch.toGaugeValue(), [id]) + validator_exit_epoch.set( + state.validators[idx].exit_epoch.toGaugeValue(), [id]) + validator_withdrawable_epoch.set( + state.validators[idx].withdrawable_epoch.toGaugeValue(), [id]) template withMonitor(self: var ValidatorMonitor, key: ValidatorPubKey, body: untyped): untyped = self.monitors.withValue(key, valuex): @@ -524,7 +618,7 @@ proc registerAttestation*( let id = monitor.id validator_monitor_unaggregated_attestation_total.inc(1, [$src, metricId]) validator_monitor_unaggregated_attestation_delay_seconds.observe( - delay.toFloatSeconds(), [$src, metricId]) + delay.toGaugeValue(), [$src, metricId]) info "Attestation seen", attestation = shortLog(attestation), @@ -549,7 +643,7 @@ proc registerAggregate*( let id = monitor.id validator_monitor_aggregated_attestation_total.inc(1, [$src, metricId]) validator_monitor_aggregated_attestation_delay_seconds.observe( - delay.toFloatSeconds(), [$src, metricId]) + delay.toGaugeValue(), [$src, metricId]) info "Aggregated attestion seen", aggregate = shortLog(signed_aggregate_and_proof.message.aggregate), @@ -564,7 +658,7 @@ proc registerAggregate*( let id = monitor.id validator_monitor_attestation_in_aggregate_total.inc(1, [$src, metricId]) validator_monitor_attestation_in_aggregate_delay_seconds.observe( - delay.toFloatSeconds(), [$src, metricId]) + delay.toGaugeValue(), [$src, metricId]) info "Attestation included in aggregate", aggregate = shortLog(signed_aggregate_and_proof.message.aggregate), @@ -585,7 +679,10 @@ proc registerAttestationInBlock*( epoch = data.slot.epoch validator_monitor_attestation_in_block_total.inc(1, ["block", metricId]) - validator_monitor_attestation_in_block_delay_slots.set(inclusion_lag.int64, ["block", metricId]) + + if not self.totals: + validator_monitor_attestation_in_block_delay_slots.set( + inclusion_lag.int64, ["block", metricId]) info "Attestation included in block", attestation_data = shortLog(data), @@ -611,7 +708,7 @@ proc registerBeaconBlock*( validator_monitor_beacon_block_total.inc(1, [$src, metricId]) validator_monitor_beacon_block_delay_seconds.observe( - delay.toFloatSeconds(), [$src, metricId]) + delay.toGaugeValue(), [$src, metricId]) info "Block seen", blck = shortLog(blck), src, epoch = slot.epoch, validator = id @@ -629,7 +726,7 @@ proc registerSyncCommitteeMessage*( validator_monitor_sync_committee_messages_total.inc(1, [$src, metricId]) validator_monitor_sync_committee_messages_delay_seconds.observe( - delay.toFloatSeconds(), [$src, metricId]) + delay.toGaugeValue(), [$src, metricId]) info "Sync committee message seen", syncCommitteeMessage = shortLog(sync_committee_message.beacon_block_root), @@ -655,7 +752,7 @@ proc registerSyncContribution*( let id = monitor.id validator_monitor_sync_contributions_total.inc(1, [$src, metricId]) validator_monitor_sync_contributions_delay_seconds.observe( - delay.toFloatSeconds(), [$src, metricId]) + delay.toGaugeValue(), [$src, metricId]) info "Sync contribution seen", contribution = shortLog(sync_contribution.message.contribution),