Skip to content

Commit

Permalink
Merge pull request #179 from ethpandaops/pk910/nft-devnet-fixes
Browse files Browse the repository at this point in the history
Fixes related to nft devnet
  • Loading branch information
pk910 authored Nov 25, 2024
2 parents 58e4b0b + 326c505 commit 217a5bf
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 24 deletions.
22 changes: 3 additions & 19 deletions db/unfinalized_epochs.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,32 +80,16 @@ func StreamUnfinalizedEpochs(epoch uint64, cb func(duty *dbtypes.UnfinalizedEpoc
return nil
}

func GetUnfinalizedEpochs(epoch uint64) *dbtypes.UnfinalizedEpoch {
func GetUnfinalizedEpoch(epoch uint64, headRoot []byte) *dbtypes.UnfinalizedEpoch {
unfinalizedEpoch := dbtypes.UnfinalizedEpoch{}
err := ReaderDb.Get(&unfinalizedEpoch, `
SELECT
epoch, dependent_root, epoch_head_root, epoch_head_fork_id, validator_count, validator_balance, eligible, voted_target,
voted_head, voted_total, block_count, orphaned_count, attestation_count, deposit_count, exit_count, withdraw_count,
withdraw_amount, attester_slashing_count, proposer_slashing_count, bls_change_count, eth_transaction_count, sync_participation
FROM unfinalized_epochs
WHERE epoch = $1
`, epoch)
if err != nil {
return nil
}
return &unfinalizedEpoch
}

func GetUnfinalizedEpoch(epoch uint64) *dbtypes.UnfinalizedEpoch {
unfinalizedEpoch := dbtypes.UnfinalizedEpoch{}
err := ReaderDb.Get(&unfinalizedEpoch, `
SELECT
epoch, dependent_root, epoch_head_root, epoch_head_fork_id, validator_count, validator_balance, eligible, voted_target,
voted_head, voted_total, block_count, orphaned_count, attestation_count, deposit_count, exit_count, withdraw_count,
withdraw_amount, attester_slashing_count, proposer_slashing_count, bls_change_count, eth_transaction_count, sync_participation
FROM unfinalized_epochs
WHERE epoch = $1
`, epoch)
WHERE epoch = $1 AND epoch_head_root = $2
`, epoch, headRoot)
if err != nil {
return nil
}
Expand Down
8 changes: 7 additions & 1 deletion indexer/beacon/canonical.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ func (indexer *Indexer) GetCanonicalHead(overrideForkId *ForkKey) *Block {
factor = 0.5
}
percentagesI += chainHeadCandidates[i].PerEpochVotingPercent[k] * factor
percentagesJ += chainHeadCandidates[j].PerEpochVotingPercent[k] * factor
if len(chainHeadCandidates[j].PerEpochVotingPercent) > k {
percentagesJ += chainHeadCandidates[j].PerEpochVotingPercent[k] * factor
}
}

if percentagesI != percentagesJ {
Expand Down Expand Up @@ -383,6 +385,10 @@ func (indexer *Indexer) GetCanonicalValidatorSet(overrideForkId *ForkKey) []*v1.
break
}

if epochStats == nil || epochStats.dependentState == nil || epochStats.dependentState.loadingStatus != 2 {
return validatorSet
}

epochStatsKey := getEpochStatsKey(epochStats.epoch, epochStats.dependentRoot)
if cachedValSet, found := indexer.validatorSetCache.Get(epochStatsKey); found {
return cachedValSet
Expand Down
18 changes: 15 additions & 3 deletions indexer/beacon/epochcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package beacon

import (
"bytes"
"crypto/md5"
"encoding/binary"
"fmt"
"runtime/debug"
"sort"
"sync"
Expand Down Expand Up @@ -47,7 +49,7 @@ type epochCache struct {
// newEpochCache creates & returns a new instance of epochCache.
// initializes the cache & starts the beacon state loader subroutine.
func newEpochCache(indexer *Indexer) *epochCache {
votesCacheSize := int(indexer.inMemoryEpochs) * 3
votesCacheSize := int(indexer.inMemoryEpochs) * 4
if votesCacheSize < 10 {
votesCacheSize = 10
} else if votesCacheSize > 200 {
Expand Down Expand Up @@ -449,18 +451,28 @@ func (cache *epochCache) loadEpochStats(epochStats *EpochStats) bool {
}
}

return cliA.index < cliB.index
hashA := md5.Sum([]byte(fmt.Sprintf("%v-%v", cliA.client.GetIndex(), epochStats.epoch)))
hashB := md5.Sum([]byte(fmt.Sprintf("%v-%v", cliB.client.GetIndex(), epochStats.epoch)))
return bytes.Compare(hashA[:], hashB[:]) < 0
})

client := clients[int(epochStats.dependentState.retryCount)%len(clients)]
log := cache.indexer.logger.WithField("client", client.client.GetName())
if epochStats.dependentState.retryCount > 0 {
log = log.WithField("retry", epochStats.dependentState.retryCount)
}

log.Infof("loading epoch %v stats (dep: %v, req: %v)", epochStats.epoch, epochStats.dependentRoot.String(), len(epochStats.requestedBy))

err := epochStats.dependentState.loadState(client.getContext(), client, cache)
if err != nil && epochStats.dependentState.loadingStatus == 0 {
client.logger.Warnf("failed loading epoch %v stats (dep: %v): %v", epochStats.epoch, epochStats.dependentRoot.String(), err)
}

if epochStats.dependentState.loadingStatus != 2 {
// epoch state could not be loaded
return true
epochStats.dependentState.retryCount++
return false
}

dependentStats := []*EpochStats{}
Expand Down
8 changes: 8 additions & 0 deletions indexer/beacon/epochstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ func (es *EpochStats) GetEpoch() phase0.Epoch {
return es.epoch
}

func (es *EpochStats) GetDependentRoot() phase0.Root {
return es.dependentRoot
}

// addRequestedBy adds a client to the list of clients that have requested this EpochStats.
func (es *EpochStats) addRequestedBy(client *Client) bool {
es.requestedMutex.Lock()
Expand Down Expand Up @@ -633,6 +637,10 @@ func (es *EpochStats) GetDbEpoch(indexer *Indexer, headBlock *Block) *dbtypes.Ep

return dbEpoch
}

if len(epochBlocks) > 0 {
indexer.logger.Warnf("no pruned epoch aggregation found for epoch %v (head: %v)", es.epoch, epochBlocks[0].Root.String())
}
}

// sort blocks ascending
Expand Down
2 changes: 1 addition & 1 deletion indexer/beacon/epochvotes.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (indexer *Indexer) aggregateEpochVotes(epoch phase0.Epoch, chainState *cons

indexer.epochCache.votesCache.Add(votesKey, votes)

indexer.logger.Debugf("aggregated epoch %v votes in %v (blocks: %v)", epoch, time.Since(t1), len(blocks))
indexer.logger.Debugf("aggregated epoch %v votes in %v (blocks: %v) [0x%x]", epoch, time.Since(t1), len(blocks), votesKey[:])
return votes
}

Expand Down

0 comments on commit 217a5bf

Please sign in to comment.