diff --git a/db/unfinalized_epochs.go b/db/unfinalized_epochs.go index 95276ff4..87285fba 100644 --- a/db/unfinalized_epochs.go +++ b/db/unfinalized_epochs.go @@ -80,7 +80,7 @@ func StreamUnfinalizedEpochs(epoch uint64, cb func(duty *dbtypes.UnfinalizedEpoc return nil } -func GetUnfinalizedEpochs(epoch uint64) *dbtypes.UnfinalizedEpoch { +func GetUnfinalizedEpoch(epoch uint64, headRoot []byte) *dbtypes.UnfinalizedEpoch { unfinalizedEpoch := dbtypes.UnfinalizedEpoch{} err := ReaderDb.Get(&unfinalizedEpoch, ` SELECT @@ -88,24 +88,8 @@ func GetUnfinalizedEpochs(epoch uint64) *dbtypes.UnfinalizedEpoch { voted_head, voted_total, block_count, orphaned_count, attestation_count, deposit_count, exit_count, withdraw_count, withdraw_amount, attester_slashing_count, proposer_slashing_count, bls_change_count, eth_transaction_count, sync_participation FROM unfinalized_epochs - WHERE epoch = $1 - `, epoch) - if err != nil { - return nil - } - return &unfinalizedEpoch -} - -func GetUnfinalizedEpoch(epoch uint64) *dbtypes.UnfinalizedEpoch { - unfinalizedEpoch := dbtypes.UnfinalizedEpoch{} - err := ReaderDb.Get(&unfinalizedEpoch, ` - SELECT - epoch, dependent_root, epoch_head_root, epoch_head_fork_id, validator_count, validator_balance, eligible, voted_target, - voted_head, voted_total, block_count, orphaned_count, attestation_count, deposit_count, exit_count, withdraw_count, - withdraw_amount, attester_slashing_count, proposer_slashing_count, bls_change_count, eth_transaction_count, sync_participation - FROM unfinalized_epochs - WHERE epoch = $1 - `, epoch) + WHERE epoch = $1 AND epoch_head_root = $2 + `, epoch, headRoot) if err != nil { return nil } diff --git a/indexer/beacon/canonical.go b/indexer/beacon/canonical.go index 1a68f4b1..443f277d 100644 --- a/indexer/beacon/canonical.go +++ b/indexer/beacon/canonical.go @@ -56,7 +56,9 @@ func (indexer *Indexer) GetCanonicalHead(overrideForkId *ForkKey) *Block { factor = 0.5 } percentagesI += chainHeadCandidates[i].PerEpochVotingPercent[k] * factor - percentagesJ += chainHeadCandidates[j].PerEpochVotingPercent[k] * factor + if len(chainHeadCandidates[j].PerEpochVotingPercent) > k { + percentagesJ += chainHeadCandidates[j].PerEpochVotingPercent[k] * factor + } } if percentagesI != percentagesJ { @@ -383,6 +385,10 @@ func (indexer *Indexer) GetCanonicalValidatorSet(overrideForkId *ForkKey) []*v1. break } + if epochStats == nil || epochStats.dependentState == nil || epochStats.dependentState.loadingStatus != 2 { + return validatorSet + } + epochStatsKey := getEpochStatsKey(epochStats.epoch, epochStats.dependentRoot) if cachedValSet, found := indexer.validatorSetCache.Get(epochStatsKey); found { return cachedValSet diff --git a/indexer/beacon/epochcache.go b/indexer/beacon/epochcache.go index fc2aca0d..5a46050d 100644 --- a/indexer/beacon/epochcache.go +++ b/indexer/beacon/epochcache.go @@ -2,7 +2,9 @@ package beacon import ( "bytes" + "crypto/md5" "encoding/binary" + "fmt" "runtime/debug" "sort" "sync" @@ -47,7 +49,7 @@ type epochCache struct { // newEpochCache creates & returns a new instance of epochCache. // initializes the cache & starts the beacon state loader subroutine. func newEpochCache(indexer *Indexer) *epochCache { - votesCacheSize := int(indexer.inMemoryEpochs) * 3 + votesCacheSize := int(indexer.inMemoryEpochs) * 4 if votesCacheSize < 10 { votesCacheSize = 10 } else if votesCacheSize > 200 { @@ -449,10 +451,19 @@ func (cache *epochCache) loadEpochStats(epochStats *EpochStats) bool { } } - return cliA.index < cliB.index + hashA := md5.Sum([]byte(fmt.Sprintf("%v-%v", cliA.client.GetIndex(), epochStats.epoch))) + hashB := md5.Sum([]byte(fmt.Sprintf("%v-%v", cliB.client.GetIndex(), epochStats.epoch))) + return bytes.Compare(hashA[:], hashB[:]) < 0 }) client := clients[int(epochStats.dependentState.retryCount)%len(clients)] + log := cache.indexer.logger.WithField("client", client.client.GetName()) + if epochStats.dependentState.retryCount > 0 { + log = log.WithField("retry", epochStats.dependentState.retryCount) + } + + log.Infof("loading epoch %v stats (dep: %v, req: %v)", epochStats.epoch, epochStats.dependentRoot.String(), len(epochStats.requestedBy)) + err := epochStats.dependentState.loadState(client.getContext(), client, cache) if err != nil && epochStats.dependentState.loadingStatus == 0 { client.logger.Warnf("failed loading epoch %v stats (dep: %v): %v", epochStats.epoch, epochStats.dependentRoot.String(), err) @@ -460,7 +471,8 @@ func (cache *epochCache) loadEpochStats(epochStats *EpochStats) bool { if epochStats.dependentState.loadingStatus != 2 { // epoch state could not be loaded - return true + epochStats.dependentState.retryCount++ + return false } dependentStats := []*EpochStats{} diff --git a/indexer/beacon/epochstats.go b/indexer/beacon/epochstats.go index d83f1c6b..f839c1dd 100644 --- a/indexer/beacon/epochstats.go +++ b/indexer/beacon/epochstats.go @@ -90,6 +90,10 @@ func (es *EpochStats) GetEpoch() phase0.Epoch { return es.epoch } +func (es *EpochStats) GetDependentRoot() phase0.Root { + return es.dependentRoot +} + // addRequestedBy adds a client to the list of clients that have requested this EpochStats. func (es *EpochStats) addRequestedBy(client *Client) bool { es.requestedMutex.Lock() @@ -633,6 +637,10 @@ func (es *EpochStats) GetDbEpoch(indexer *Indexer, headBlock *Block) *dbtypes.Ep return dbEpoch } + + if len(epochBlocks) > 0 { + indexer.logger.Warnf("no pruned epoch aggregation found for epoch %v (head: %v)", es.epoch, epochBlocks[0].Root.String()) + } } // sort blocks ascending diff --git a/indexer/beacon/epochvotes.go b/indexer/beacon/epochvotes.go index 78ca72fd..ea6609cb 100644 --- a/indexer/beacon/epochvotes.go +++ b/indexer/beacon/epochvotes.go @@ -191,7 +191,7 @@ func (indexer *Indexer) aggregateEpochVotes(epoch phase0.Epoch, chainState *cons indexer.epochCache.votesCache.Add(votesKey, votes) - indexer.logger.Debugf("aggregated epoch %v votes in %v (blocks: %v)", epoch, time.Since(t1), len(blocks)) + indexer.logger.Debugf("aggregated epoch %v votes in %v (blocks: %v) [0x%x]", epoch, time.Since(t1), len(blocks), votesKey[:]) return votes }