Skip to content

Commit

Permalink
Minor refactor elapsed time measurements in ProcessEpoch (#12545)
Browse files Browse the repository at this point in the history
  • Loading branch information
domiwei authored Oct 30, 2024
1 parent a4ab8a1 commit e1f959c
Show file tree
Hide file tree
Showing 12 changed files with 37 additions and 83 deletions.
73 changes: 18 additions & 55 deletions cl/monitor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ var (
executionTime = metrics.GetOrCreateGauge("execution_time")

// Epoch processing metrics
epochProcessingTime = metrics.GetOrCreateGauge("epoch_processing_time")
processJustificationBitsAndFinalityTime = metrics.GetOrCreateGauge("process_justification_bits_and_finality_time")
EpochProcessingTime = metrics.GetOrCreateGauge("epoch_processing_time")
ProcessJustificationBitsAndFinalityTime = metrics.GetOrCreateGauge("process_justification_bits_and_finality_time")
ProcessInactivityScoresTime = metrics.GetOrCreateGauge("process_inactivity_ccores_time")
processRewardsAndPenaltiesTime = metrics.GetOrCreateGauge("process_rewards_and_penalties_time")
processRegistryUpdatesTime = metrics.GetOrCreateGauge("process_registry_updates_time")
processSlashingsTime = metrics.GetOrCreateGauge("process_slashings_time")
processEffectiveBalanceUpdatesTime = metrics.GetOrCreateGauge("process_effective_balance_updates_time")
processHistoricalRootsUpdateTime = metrics.GetOrCreateGauge("process_historical_roots_update_time")
processParticipationFlagUpdatesTime = metrics.GetOrCreateGauge("process_participation_flag_updates_time")
processSyncCommitteeUpdateTime = metrics.GetOrCreateGauge("process_sync_committee_update_time")
ProcessRewardsAndPenaltiesTime = metrics.GetOrCreateGauge("process_rewards_and_penalties_time")
ProcessRegistryUpdatesTime = metrics.GetOrCreateGauge("process_registry_updates_time")
ProcessSlashingsTime = metrics.GetOrCreateGauge("process_slashings_time")
ProcessEffectiveBalanceUpdatesTime = metrics.GetOrCreateGauge("process_effective_balance_updates_time")
ProcessHistoricalRootsUpdateTime = metrics.GetOrCreateGauge("process_historical_roots_update_time")
ProcessParticipationFlagUpdatesTime = metrics.GetOrCreateGauge("process_participation_flag_updates_time")
ProcessSyncCommitteeUpdateTime = metrics.GetOrCreateGauge("process_sync_committee_update_time")

// Network metrics
gossipTopicsMetricCounterPrefix = "gossip_topics_seen"
Expand Down Expand Up @@ -129,61 +129,24 @@ func ObserveNumberOfAggregateSignatures(signatures int) {
aggregateAndProofSignatures.Add(float64(signatures))
}

// ObserveEpochProcessingTime sets last epoch processing time
func ObserveEpochProcessingTime(startTime time.Time) {
epochProcessingTime.Set(float64(time.Since(startTime).Microseconds()))
type TimeMeasure struct {
start time.Time
metric metrics.Gauge
}

// ObserveProcessJustificationBitsAndFinalityTime sets ProcessJustificationBitsAndFinality time
func ObserveProcessJustificationBitsAndFinalityTime(startTime time.Time) {
processJustificationBitsAndFinalityTime.Set(float64(time.Since(startTime).Microseconds()))
func (m TimeMeasure) End() {
m.metric.Set(float64(time.Since(m.start).Microseconds()))
}

func ObserveElaspedTime(m metrics.Gauge) TimeMeasure {
return TimeMeasure{start: time.Now(), metric: m}
}

// ObserveAggregateAttestation sets the time it took add new attestation to aggregateAndProof
func ObserveAggregateAttestation(startTime time.Time) {
aggregateAttestation.Set(float64(time.Since(startTime).Microseconds()))
}

// ObserveProcessRewardsAndPenaltiesTime sets ProcessRewardsAndPenalties time
func ObserveProcessRewardsAndPenaltiesTime(startTime time.Time) {
processRewardsAndPenaltiesTime.Set(float64(time.Since(startTime).Microseconds()))
}

// ObserveProcessParticipationFlagUpdatesTime sets ProcessParticipationFlagUpdates time
func ObserveProcessParticipationFlagUpdatesTime(startTime time.Time) {
processParticipationFlagUpdatesTime.Set(float64(time.Since(startTime).Microseconds()))
}

// ObserveProcessInactivityScoresTime sets ProcessJustificationBitsAndFinality time
func ObserveProcessInactivityScoresTime(startTime time.Time) {
ProcessInactivityScoresTime.Set(float64(time.Since(startTime).Microseconds()))
}

// ObserveProcessHistoricalRootsUpdateTime sets ProcessHistoricalRootsUpdate time
func ObserveProcessHistoricalRootsUpdateTime(startTime time.Time) {
processHistoricalRootsUpdateTime.Set(float64(time.Since(startTime).Microseconds()))
}

// ObserveProcessSyncCommitteeUpdateTime sets ProcessSyncCommitteeUpdate time
func ObserveProcessSyncCommitteeUpdateTime(startTime time.Time) {
processSyncCommitteeUpdateTime.Set(float64(time.Since(startTime).Microseconds()))
}

// ObserveProcessEffectiveBalanceUpdatesTime sets ProcessEffectiveBalanceUpdates time
func ObserveProcessEffectiveBalanceUpdatesTime(startTime time.Time) {
processEffectiveBalanceUpdatesTime.Set(float64(time.Since(startTime).Microseconds()))
}

// ObserveProcessRegistryUpdatesTime sets ProcessRegistryUpdates time
func ObserveProcessRegistryUpdatesTime(startTime time.Time) {
processRegistryUpdatesTime.Set(float64(time.Since(startTime).Microseconds()))
}

// ObserveProcessSlashingsTime sets ProcessSlashings time
func ObserveProcessSlashingsTime(startTime time.Time) {
processSlashingsTime.Set(float64(time.Since(startTime).Microseconds()))
}

// ObserveAttestHit increments the attestation hit metric
func ObserveAttestationBlockProcessingTime(startTime time.Time) {
attestationBlockProcessingTime.Set(microToMilli(time.Since(startTime).Microseconds()))
Expand Down
2 changes: 0 additions & 2 deletions cl/transition/impl/eth2/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,6 @@ func (I *impl) ProcessSlots(s abstract.BeaconState, slot uint64) error {

if (sSlot+1)%beaconConfig.SlotsPerEpoch == 0 {
start := time.Now()

if err := statechange.ProcessEpoch(s); err != nil {
return err
}
Expand All @@ -873,7 +872,6 @@ func (I *impl) ProcessSlots(s abstract.BeaconState, slot uint64) error {
"process_epoch_elpsed",
time.Since(start),
)
monitor.ObserveEpochProcessingTime(start)
}

sSlot += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/erigontech/erigon/cl/clparams"
"github.com/erigontech/erigon/cl/cltypes"
"github.com/erigontech/erigon/cl/cltypes/solid"
"github.com/erigontech/erigon/cl/monitor"
"github.com/erigontech/erigon/cl/phase1/core/state"
"github.com/erigontech/erigon/cl/utils/threading"
)
Expand Down Expand Up @@ -79,6 +80,7 @@ func weighJustificationAndFinalization(s abstract.BeaconState, previousEpochTarg
}

func ProcessJustificationBitsAndFinality(s abstract.BeaconState, unslashedParticipatingIndicies [][]bool) error {
defer monitor.ObserveElaspedTime(monitor.ProcessJustificationBitsAndFinalityTime).End()
currentEpoch := state.Epoch(s)
beaconConfig := s.BeaconConfig()
// Skip for first 2 epochs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package statechange
import (
"github.com/erigontech/erigon/cl/abstract"
"github.com/erigontech/erigon/cl/cltypes/solid"
"github.com/erigontech/erigon/cl/monitor"
)

// ProcessEffectiveBalanceUpdates updates the effective balance of validators. Specs at: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#effective-balances-updates
func ProcessEffectiveBalanceUpdates(state abstract.BeaconState) error {
defer monitor.ObserveElaspedTime(monitor.ProcessEffectiveBalanceUpdatesTime).End()
beaconConfig := state.BeaconConfig()
// Define non-changing constants to avoid recomputation.
histeresisIncrement := beaconConfig.EffectiveBalanceIncrement / beaconConfig.HysteresisQuotient
Expand Down
27 changes: 1 addition & 26 deletions cl/transition/impl/eth2/statechange/process_epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package statechange

import (
"runtime"
"time"

"github.com/erigontech/erigon/cl/abstract"
"github.com/erigontech/erigon/cl/clparams"
Expand Down Expand Up @@ -47,64 +46,45 @@ func GetUnslashedIndiciesSet(cfg *clparams.BeaconChainConfig, previousEpoch uint

// ProcessEpoch process epoch transition.
func ProcessEpoch(s abstract.BeaconState) error {
defer monitor.ObserveElaspedTime(monitor.EpochProcessingTime).End()
eligibleValidators := state.EligibleValidatorsIndicies(s)
var unslashedIndiciesSet [][]bool
if s.Version() >= clparams.AltairVersion {
unslashedIndiciesSet = GetUnslashedIndiciesSet(s.BeaconConfig(), state.PreviousEpoch(s), s.ValidatorSet(), s.PreviousEpochParticipation())
}
start := time.Now()
if err := ProcessJustificationBitsAndFinality(s, unslashedIndiciesSet); err != nil {
return err
}
monitor.ObserveProcessJustificationBitsAndFinalityTime(start)
// fmt.Println("ProcessJustificationBitsAndFinality", time.Since(start))

if s.Version() >= clparams.AltairVersion {
start = time.Now()
if err := ProcessInactivityScores(s, eligibleValidators, unslashedIndiciesSet); err != nil {
return err
}
monitor.ObserveProcessInactivityScoresTime(start)
}

// fmt.Println("ProcessInactivityScores", time.Since(start))
start = time.Now()
if err := ProcessRewardsAndPenalties(s, eligibleValidators, unslashedIndiciesSet); err != nil {
return err
}
monitor.ObserveProcessRewardsAndPenaltiesTime(start)

// fmt.Println("ProcessRewardsAndPenalties", time.Since(start))
start = time.Now()
if err := ProcessRegistryUpdates(s); err != nil {
return err
}
monitor.ObserveProcessRegistryUpdatesTime(start)

// fmt.Println("ProcessRegistryUpdates", time.Since(start))
start = time.Now()
if err := ProcessSlashings(s); err != nil {
return err
}
monitor.ObserveProcessSlashingsTime(start)

// fmt.Println("ProcessSlashings", time.Since(start))
ProcessEth1DataReset(s)
start = time.Now()
if err := ProcessEffectiveBalanceUpdates(s); err != nil {
return err
}
monitor.ObserveProcessEffectiveBalanceUpdatesTime(start)

// fmt.Println("ProcessEffectiveBalanceUpdates", time.Since(start))
ProcessSlashingsReset(s)
ProcessRandaoMixesReset(s)

start = time.Now()
if err := ProcessHistoricalRootsUpdate(s); err != nil {
return err
}
monitor.ObserveProcessHistoricalRootsUpdateTime(start)

if s.Version() == clparams.Phase0Version {
if err := ProcessParticipationRecordUpdates(s); err != nil {
Expand All @@ -113,15 +93,10 @@ func ProcessEpoch(s abstract.BeaconState) error {
}

if s.Version() >= clparams.AltairVersion {
start = time.Now()
ProcessParticipationFlagUpdates(s)
monitor.ObserveProcessParticipationFlagUpdatesTime(start)

start = time.Now()
if err := ProcessSyncCommitteeUpdate(s); err != nil {
return err
}
monitor.ObserveProcessSyncCommitteeUpdateTime(start)
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import (
"github.com/erigontech/erigon/cl/abstract"
"github.com/erigontech/erigon/cl/clparams"
"github.com/erigontech/erigon/cl/cltypes"
"github.com/erigontech/erigon/cl/monitor"
"github.com/erigontech/erigon/cl/phase1/core/state"
"github.com/erigontech/erigon/cl/utils"
)

// ProcessHistoricalRootsUpdate updates the historical root data structure by computing a new historical root batch when it is time to do so.
func ProcessHistoricalRootsUpdate(s abstract.BeaconState) error {
defer monitor.ObserveElaspedTime(monitor.ProcessHistoricalRootsUpdateTime).End()
nextEpoch := state.Epoch(s) + 1
beaconConfig := s.BeaconConfig()
blockRoots := s.BlockRoots()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import (
"runtime"

"github.com/erigontech/erigon/cl/abstract"
"github.com/erigontech/erigon/cl/monitor"
"github.com/erigontech/erigon/cl/phase1/core/state"
"github.com/erigontech/erigon/cl/utils/threading"
)

// ProcessInactivityScores will updates the inactivity registry of each validator.
func ProcessInactivityScores(s abstract.BeaconState, eligibleValidatorsIndicies []uint64, unslashedIndicies [][]bool) error {
defer monitor.ObserveElaspedTime(monitor.ProcessInactivityScoresTime).End()
if state.Epoch(s) == s.BeaconConfig().GenesisEpoch {
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"

"github.com/erigontech/erigon/cl/abstract"
"github.com/erigontech/erigon/cl/monitor"
"github.com/erigontech/erigon/cl/utils/threading"

"github.com/erigontech/erigon/cl/phase1/core/state"
Expand All @@ -41,6 +42,7 @@ type minimizeQueuedValidator struct {

// ProcessRegistryUpdates updates every epoch the activation status of validators. Specs at: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#registry-updates.
func ProcessRegistryUpdates(s abstract.BeaconState) error {
defer monitor.ObserveElaspedTime(monitor.ProcessRegistryUpdatesTime).End()
beaconConfig := s.BeaconConfig()
currentEpoch := state.Epoch(s)
// start also initializing the activation queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/erigontech/erigon/cl/abstract"
"github.com/erigontech/erigon/cl/clparams"
"github.com/erigontech/erigon/cl/cltypes/solid"
"github.com/erigontech/erigon/cl/monitor"
"github.com/erigontech/erigon/cl/phase1/core/state"
"github.com/erigontech/erigon/cl/utils/threading"
)
Expand Down Expand Up @@ -285,6 +286,7 @@ func processRewardsAndPenaltiesPhase0(s abstract.BeaconState, eligibleValidators

// ProcessRewardsAndPenalties applies rewards/penalties accumulated during previous epoch.
func ProcessRewardsAndPenalties(s abstract.BeaconState, eligibleValidators []uint64, unslashedIndicies [][]bool) error {
defer monitor.ObserveElaspedTime(monitor.ProcessRewardsAndPenaltiesTime).End()
if state.Epoch(s) == s.BeaconConfig().GenesisEpoch {
return nil
}
Expand Down
2 changes: 2 additions & 0 deletions cl/transition/impl/eth2/statechange/process_slashings.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/erigontech/erigon/cl/abstract"
"github.com/erigontech/erigon/cl/clparams"
"github.com/erigontech/erigon/cl/monitor"
"github.com/erigontech/erigon/cl/phase1/core/state"
"github.com/erigontech/erigon/cl/utils/threading"
)
Expand Down Expand Up @@ -56,6 +57,7 @@ func processSlashings(s abstract.BeaconState, slashingMultiplier uint64) error {
}

func ProcessSlashings(state abstract.BeaconState) error {
defer monitor.ObserveElaspedTime(monitor.ProcessSlashingsTime).End()
// Depending on the version of the state, use different multipliers
switch state.Version() {
case clparams.Phase0Version:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ package statechange

import (
"github.com/erigontech/erigon/cl/abstract"
"github.com/erigontech/erigon/cl/monitor"
"github.com/erigontech/erigon/cl/phase1/core/state"
)

// ProcessSyncCommitteeUpdate implements processing for the sync committee update. unfortunately there is no easy way to test it.
func ProcessSyncCommitteeUpdate(s abstract.BeaconState) error {
defer monitor.ObserveElaspedTime(monitor.ProcessSyncCommitteeUpdateTime).End()
if (state.Epoch(s)+1)%s.BeaconConfig().EpochsPerSyncCommitteePeriod != 0 {
return nil
}
Expand Down
2 changes: 2 additions & 0 deletions cl/transition/impl/eth2/statechange/resets.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package statechange

import (
"github.com/erigontech/erigon/cl/abstract"
"github.com/erigontech/erigon/cl/monitor"
"github.com/erigontech/erigon/cl/phase1/core/state"
)

Expand All @@ -40,5 +41,6 @@ func ProcessRandaoMixesReset(s abstract.BeaconState) {
}

func ProcessParticipationFlagUpdates(state abstract.BeaconState) {
defer monitor.ObserveElaspedTime(monitor.ProcessParticipationFlagUpdatesTime).End()
state.ResetEpochParticipation()
}

0 comments on commit e1f959c

Please sign in to comment.