From a2be5f199da67f29595aea1153233795ff331b9c Mon Sep 17 00:00:00 2001 From: algonautshant Date: Mon, 14 Jun 2021 19:01:04 -0400 Subject: [PATCH 1/4] In this chage, fixes to peer selector and the test. peerSelector.go: various bug fixes - introduce peerSelectorPeer to wrap the network.Peer and add peerClass information, to be able to distinguish between peers of the same address but different classes. - keep track of download failures to be able to increase the cost of each failure when failing more than succeeding. This is to evict the peer faster when constantly failing to download. - initialize rankSum and rankSamples to initialRank of the class. Otherwise, the peer rank will have a very long warmup time before relfecting the correct rank. - let resetRequestPenalty bound the rank within the class bounds. Otherwise, the penalty calculation pushes the rank out of the class bounds (bug). - getNextPeer is local to the package - getNextPeer, PeerDownloadDurationToRank and RankPeer use peerSelectorPeer instead of network.Peer - refreshAvailablePeers distinguishes between peers with the same address but of different peer class - findPeer returns the peer given the address and the peer class (instead of just the address) catchpointCatchup_test.go: - Remove comment about giving the second node all the stake, since it is not the case here. - Use the round from the catchpoint instead of guessing the round as 36. In case the following catchpoint was obtained due to race conditions, checking for round 37 will be trivial, since it will also be obtained from the catchpoint. catchpointService.go and service.go: - Update the code to use peerSelectorPeer instead of network.Peer with peerSelector peerSelector_test.go: - Update the tests to use peerSelectorPeer instead of network.Peer with peerSelector - Cleanup debugging printouts. --- catchup/catchpointService.go | 60 +++++----- catchup/peerSelector.go | 112 +++++++++++------- catchup/peerSelector_test.go | 97 +++++++-------- catchup/service.go | 22 ++-- .../catchup/catchpointCatchup_test.go | 7 +- 5 files changed, 157 insertions(+), 141 deletions(-) diff --git a/catchup/catchpointService.go b/catchup/catchpointService.go index c6bdf8ac0b..a682a4a428 100644 --- a/catchup/catchpointService.go +++ b/catchup/catchpointService.go @@ -281,7 +281,8 @@ func (cs *CatchpointCatchupService) processStageLedgerDownload() (err error) { } return cs.abort(fmt.Errorf("processStageLedgerDownload failed to reset staging balances : %v", err)) } - peer, err := peerSelector.GetNextPeer() + psp, err := peerSelector.getNextPeer() + peer := psp.Peer if err != nil { err = fmt.Errorf("processStageLedgerDownload: catchpoint catchup was unable to obtain a list of peers to retrieve the catchpoint file from") return cs.abort(err) @@ -293,9 +294,9 @@ func (cs *CatchpointCatchupService) processStageLedgerDownload() (err error) { break } // failed to build the merkle trie for the above catchpoint file. - peerSelector.RankPeer(peer, peerRankInvalidDownload) + peerSelector.RankPeer(psp, peerRankInvalidDownload) } else { - peerSelector.RankPeer(peer, peerRankDownloadFailed) + peerSelector.RankPeer(psp, peerRankDownloadFailed) } // instead of testing for err == cs.ctx.Err() , we'll check on the context itself. @@ -345,11 +346,11 @@ func (cs *CatchpointCatchupService) processStageLastestBlockDownload() (err erro for { attemptsCount++ - peer := network.Peer(0) + var psp *peerSelectorPeer blockDownloadDuration := time.Duration(0) if blk == nil { var stop bool - blk, blockDownloadDuration, peer, stop, err = cs.fetchBlock(blockRound, uint64(attemptsCount)) + blk, blockDownloadDuration, psp, stop, err = cs.fetchBlock(blockRound, uint64(attemptsCount)) if stop { return err } else if blk == nil { @@ -364,7 +365,7 @@ func (cs *CatchpointCatchupService) processStageLastestBlockDownload() (err erro if attemptsCount <= cs.config.CatchupBlockDownloadRetryAttempts { // try again. blk = nil - cs.blocksDownloadPeerSelector.RankPeer(peer, peerRankInvalidDownload) + cs.blocksDownloadPeerSelector.RankPeer(psp, peerRankInvalidDownload) continue } return cs.abort(fmt.Errorf("processStageLastestBlockDownload: unsupported protocol version detected: '%v'", blk.BlockHeader.CurrentProtocol)) @@ -376,7 +377,7 @@ func (cs *CatchpointCatchupService) processStageLastestBlockDownload() (err erro if attemptsCount <= cs.config.CatchupBlockDownloadRetryAttempts { // try again. blk = nil - cs.blocksDownloadPeerSelector.RankPeer(peer, peerRankInvalidDownload) + cs.blocksDownloadPeerSelector.RankPeer(psp, peerRankInvalidDownload) continue } return cs.abort(fmt.Errorf("processStageLastestBlockDownload: genesis hash mismatches : genesis hash on genesis.json file is %v while genesis hash of downloaded block is %v", cs.ledger.GenesisHash(), blk.GenesisHash())) @@ -389,7 +390,7 @@ func (cs *CatchpointCatchupService) processStageLastestBlockDownload() (err erro if attemptsCount <= cs.config.CatchupBlockDownloadRetryAttempts { // try again. blk = nil - cs.blocksDownloadPeerSelector.RankPeer(peer, peerRankInvalidDownload) + cs.blocksDownloadPeerSelector.RankPeer(psp, peerRankInvalidDownload) continue } return cs.abort(fmt.Errorf("processStageLastestBlockDownload: downloaded block content does not match downloaded block header")) @@ -405,14 +406,14 @@ func (cs *CatchpointCatchupService) processStageLastestBlockDownload() (err erro // try again. blk = nil cs.log.Infof("processStageLastestBlockDownload: block %d verification against catchpoint failed, another attempt will be made; err = %v", blockRound, err) - cs.blocksDownloadPeerSelector.RankPeer(peer, peerRankInvalidDownload) + cs.blocksDownloadPeerSelector.RankPeer(psp, peerRankInvalidDownload) continue } return cs.abort(fmt.Errorf("processStageLastestBlockDownload failed when calling VerifyCatchpoint : %v", err)) } // give a rank to the download, as the download was successful. - peerRank := cs.blocksDownloadPeerSelector.PeerDownloadDurationToRank(peer, blockDownloadDuration) - cs.blocksDownloadPeerSelector.RankPeer(peer, peerRank) + peerRank := cs.blocksDownloadPeerSelector.PeerDownloadDurationToRank(psp, blockDownloadDuration) + cs.blocksDownloadPeerSelector.RankPeer(psp, peerRank) err = cs.ledgerAccessor.StoreBalancesRound(cs.ctx, blk) if err != nil { @@ -495,11 +496,11 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) { } } - peer := network.Peer(0) + var psp *peerSelectorPeer blockDownloadDuration := time.Duration(0) if blk == nil { var stop bool - blk, blockDownloadDuration, peer, stop, err = cs.fetchBlock(topBlock.Round()-basics.Round(blocksFetched), retryCount) + blk, blockDownloadDuration, psp, stop, err = cs.fetchBlock(topBlock.Round()-basics.Round(blocksFetched), retryCount) if stop { return err } else if blk == nil { @@ -515,7 +516,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) { // not identical, retry download. cs.log.Warnf("processStageBlocksDownload downloaded block(%d) did not match it's successor(%d) block hash %v != %v", blk.Round(), prevBlock.Round(), blk.Hash(), prevBlock.BlockHeader.Branch) cs.updateBlockRetrievalStatistics(-1, 0) - cs.blocksDownloadPeerSelector.RankPeer(peer, peerRankInvalidDownload) + cs.blocksDownloadPeerSelector.RankPeer(psp, peerRankInvalidDownload) if retryCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) { // try again. retryCount++ @@ -528,7 +529,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) { if _, ok := config.Consensus[blk.BlockHeader.CurrentProtocol]; !ok { cs.log.Warnf("processStageBlocksDownload: unsupported protocol version detected: '%v'", blk.BlockHeader.CurrentProtocol) cs.updateBlockRetrievalStatistics(-1, 0) - cs.blocksDownloadPeerSelector.RankPeer(peer, peerRankInvalidDownload) + cs.blocksDownloadPeerSelector.RankPeer(psp, peerRankInvalidDownload) if retryCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) { // try again. retryCount++ @@ -541,7 +542,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) { if !blk.ContentsMatchHeader() { cs.log.Warnf("processStageBlocksDownload: downloaded block content does not match downloaded block header") // try again. - cs.blocksDownloadPeerSelector.RankPeer(peer, peerRankInvalidDownload) + cs.blocksDownloadPeerSelector.RankPeer(psp, peerRankInvalidDownload) cs.updateBlockRetrievalStatistics(-1, 0) if retryCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) { // try again. @@ -552,8 +553,8 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) { } cs.updateBlockRetrievalStatistics(0, 1) - peerRank := cs.blocksDownloadPeerSelector.PeerDownloadDurationToRank(peer, blockDownloadDuration) - cs.blocksDownloadPeerSelector.RankPeer(peer, peerRank) + peerRank := cs.blocksDownloadPeerSelector.PeerDownloadDurationToRank(psp, blockDownloadDuration) + cs.blocksDownloadPeerSelector.RankPeer(psp, peerRank) // all good, persist and move on. err = cs.ledgerAccessor.StoreBlock(cs.ctx, blk) @@ -581,39 +582,40 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) { // fetchBlock uses the internal peer selector blocksDownloadPeerSelector to pick a peer and then attempt to fetch the block requested from that peer. // The method return stop=true if the caller should exit the current operation // If the method return a nil block, the caller is expected to retry the operation, increasing the retry counter as needed. -func (cs *CatchpointCatchupService) fetchBlock(round basics.Round, retryCount uint64) (blk *bookkeeping.Block, downloadDuration time.Duration, peer network.Peer, stop bool, err error) { - peer, err = cs.blocksDownloadPeerSelector.GetNextPeer() +func (cs *CatchpointCatchupService) fetchBlock(round basics.Round, retryCount uint64) (blk *bookkeeping.Block, downloadDuration time.Duration, psp *peerSelectorPeer, stop bool, err error) { + psp, err = cs.blocksDownloadPeerSelector.getNextPeer() + peer := psp.Peer if err != nil { err = fmt.Errorf("fetchBlock: unable to obtain a list of peers to retrieve the latest block from") - return nil, time.Duration(0), peer, true, cs.abort(err) + return nil, time.Duration(0), psp, true, cs.abort(err) } httpPeer, validPeer := peer.(network.HTTPPeer) if !validPeer { cs.log.Warnf("fetchBlock: non-HTTP peer was provided by the peer selector") - cs.blocksDownloadPeerSelector.RankPeer(peer, peerRankInvalidDownload) + cs.blocksDownloadPeerSelector.RankPeer(psp, peerRankInvalidDownload) if retryCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) { // try again. - return nil, time.Duration(0), peer, false, nil + return nil, time.Duration(0), psp, false, nil } - return nil, time.Duration(0), peer, true, cs.abort(fmt.Errorf("fetchBlock: recurring non-HTTP peer was provided by the peer selector")) + return nil, time.Duration(0), psp, true, cs.abort(fmt.Errorf("fetchBlock: recurring non-HTTP peer was provided by the peer selector")) } fetcher := makeUniversalBlockFetcher(cs.log, cs.net, cs.config) blk, _, downloadDuration, err = fetcher.fetchBlock(cs.ctx, round, httpPeer) if err != nil { if cs.ctx.Err() != nil { - return nil, time.Duration(0), peer, true, cs.stopOrAbort() + return nil, time.Duration(0), psp, true, cs.stopOrAbort() } if retryCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) { // try again. cs.log.Infof("Failed to download block %d on attempt %d out of %d. %v", round, retryCount, cs.config.CatchupBlockDownloadRetryAttempts, err) - cs.blocksDownloadPeerSelector.RankPeer(peer, peerRankDownloadFailed) - return nil, time.Duration(0), peer, false, nil + cs.blocksDownloadPeerSelector.RankPeer(psp, peerRankDownloadFailed) + return nil, time.Duration(0), psp, false, nil } - return nil, time.Duration(0), peer, true, cs.abort(fmt.Errorf("fetchBlock failed after multiple blocks download attempts")) + return nil, time.Duration(0), psp, true, cs.abort(fmt.Errorf("fetchBlock failed after multiple blocks download attempts")) } // success - return blk, downloadDuration, peer, false, nil + return blk, downloadDuration, psp, false, nil } // processStageLedgerDownload is the fifth catchpoint catchup stage. It completes the catchup process, swap the new tables and restart the node functionality. diff --git a/catchup/peerSelector.go b/catchup/peerSelector.go index d8b3313d92..b38d05aade 100644 --- a/catchup/peerSelector.go +++ b/catchup/peerSelector.go @@ -65,6 +65,13 @@ const ( var errPeerSelectorNoPeerPoolsAvailable = errors.New("no peer pools available") +// peerSelectorPeer is a peer wrapper, adding to it the peerClass information +// There can be multiple peer elements with the same address but different peer classes, and it is important to distinguish between them. +type peerSelectorPeer struct { + network.Peer + peerClass network.PeerOption +} + // peerClass defines the type of peer we want to have in a particular "class", // and define the network.PeerOption that would be used to retrieve that type of // peer @@ -112,18 +119,19 @@ type peerSelector struct { // peer in the past, and to be forgiving of occasional performance // variations which may not be representative of the peer's overall // performance. It also stores the penalty history in the form or peer -// selection gaps. +// selection gaps, and the count of peerRankDownloadFailed incidents. type historicStats struct { - windowSize int - rankSamples []int - rankSum uint64 - requestGaps []uint64 - gapSum float64 - counter uint64 + windowSize int + rankSamples []int + rankSum uint64 + requestGaps []uint64 + gapSum float64 + counter uint64 + downloadFailures int } -func makeHistoricStatus(windowSize int) *historicStats { - // Initialize the window (rankSamples) with zeros. +func makeHistoricStatus(windowSize int, class peerClass) *historicStats { + // Initialize the window (rankSamples) with zeros but rankSum with the equivalent sum of class initial rank. // This way, every peer will slowly build up its profile. // Otherwise, if the best peer gets a bad download the first time, // that will determine the rank of the peer. @@ -131,8 +139,11 @@ func makeHistoricStatus(windowSize int) *historicStats { windowSize: windowSize, rankSamples: make([]int, windowSize, windowSize), requestGaps: make([]uint64, 0, windowSize), - rankSum: 0, + rankSum: uint64(class.initialRank) * uint64(windowSize), gapSum: 0.0} + for i := 0; i < windowSize; i++ { + hs.rankSamples[i] = class.initialRank + } return &hs } @@ -146,7 +157,7 @@ func (hs *historicStats) computerPenalty() float64 { // counter for ranking a peer. It calculates newGap, which is the // number of counter ticks since it last was updated (i.e. last ranked // after being selected). The number of gaps stored is bounded by the -// windowSize. Calculages and returns the new penalty. +// windowSize. Calculates and returns the new penalty. func (hs *historicStats) updateRequestPenalty(counter uint64) float64 { newGap := counter - hs.counter hs.counter = counter @@ -188,7 +199,7 @@ func (hs *historicStats) resetRequestPenalty(steps int, initialRank int, class p hs.gapSum -= 1.0 / float64(hs.requestGaps[s]) } hs.requestGaps = hs.requestGaps[steps:] - return int(hs.computerPenalty() * (float64(hs.rankSum) / float64(len(hs.rankSamples)))) + return boundRankByClass(int(hs.computerPenalty()*(float64(hs.rankSum)/float64(len(hs.rankSamples)))), class) } // push pushes a new rank to the historicStats, and returns the new @@ -213,13 +224,20 @@ func (hs *historicStats) push(value int, counter uint64, class peerClass) (avera // Download may fail for various reasons. Give it additional tries // and see if it recovers/improves. if value == peerRankDownloadFailed { - // Set the rank to 10 + the class upper bound, to evict - // the peer from the class if it is repeatedly - // failing. This is to make sure to switch to the next - // class when all peers in this class are failing. - // Here, +10 is added. This is of little consequence, and the - // purpose is to avoid rounding errors. - value = upperBound(class) + 10 + hs.downloadFailures++ + // - Set the rank to the class upper bound multiplied + // by the number of downloadFailures. + // - Each downloadFailure increments the counter, and + // each non-failure decrements it, until it gets to 0. + // - When the peer is consistently failing to + // download, the value added to rankSum will + // increase at an increasing rate to evict the peer + // from the class sooner. + value = upperBound(class) * hs.downloadFailures + } else { + if hs.downloadFailures > 0 { + hs.downloadFailures-- + } } hs.rankSamples = append(hs.rankSamples, value) @@ -259,10 +277,10 @@ func makePeerSelector(net peersRetriever, initialPeersClasses []peerClass) *peer return selector } -// GetNextPeer returns the next peer. It randomally selects a peer from a pool that has +// getNextPeer returns the next peer. It randomally selects a peer from a pool that has // the lowest rank value. Given that the peers are grouped by their ranks, allow us to // prioritize peers based on their class and/or performance. -func (ps *peerSelector) GetNextPeer() (peer network.Peer, err error) { +func (ps *peerSelector) getNextPeer() (psp *peerSelectorPeer, err error) { ps.mu.Lock() defer ps.mu.Unlock() ps.refreshAvailablePeers() @@ -273,25 +291,24 @@ func (ps *peerSelector) GetNextPeer() (peer network.Peer, err error) { // provide the needed test. // pick one of the peers from this pool at random peerIdx := crypto.RandUint64() % uint64(len(pool.peers)) - peer = pool.peers[peerIdx].peer + psp = &peerSelectorPeer{pool.peers[peerIdx].peer, pool.peers[peerIdx].class.peerClass} return } } - return nil, errPeerSelectorNoPeerPoolsAvailable } // RankPeer ranks a given peer. // return the old value and the new updated value. // updated value could be different from the input rank. -func (ps *peerSelector) RankPeer(peer network.Peer, rank int) (int, int) { - if peer == nil { +func (ps *peerSelector) RankPeer(psp *peerSelectorPeer, rank int) (int, int) { + if psp == nil { return -1, -1 } ps.mu.Lock() defer ps.mu.Unlock() - poolIdx, peerIdx := ps.findPeer(peer) + poolIdx, peerIdx := ps.findPeer(psp) if poolIdx < 0 || peerIdx < 0 { return -1, -1 } @@ -313,7 +330,7 @@ func (ps *peerSelector) RankPeer(peer network.Peer, rank int) (int, int) { ps.pools = append(ps.pools[:poolIdx], ps.pools[poolIdx+1:]...) } - sortNeeded = ps.addToPool(peer, rank, class, peerHistory) + sortNeeded = ps.addToPool(psp.Peer, rank, class, peerHistory) } // Update the ranks of the peers by reducing the penalty for not beeing selected @@ -321,7 +338,7 @@ func (ps *peerSelector) RankPeer(peer network.Peer, rank int) (int, int) { pool := ps.pools[pl] for pr := len(pool.peers) - 1; pr >= 0; pr-- { localPeer := pool.peers[pr] - if pool.peers[pr].peer == peer { + if pool.peers[pr].peer == psp.Peer { continue } // make the removal of penalty at a faster rate than adding it, so that the @@ -351,10 +368,10 @@ func (ps *peerSelector) RankPeer(peer network.Peer, rank int) (int, int) { } // PeerDownloadDurationToRank calculates the rank for a peer given a peer and the block download time. -func (ps *peerSelector) PeerDownloadDurationToRank(peer network.Peer, blockDownloadDuration time.Duration) (rank int) { +func (ps *peerSelector) PeerDownloadDurationToRank(psp *peerSelectorPeer, blockDownloadDuration time.Duration) (rank int) { ps.mu.Lock() defer ps.mu.Unlock() - poolIdx, peerIdx := ps.findPeer(peer) + poolIdx, peerIdx := ps.findPeer(psp) if poolIdx < 0 || peerIdx < 0 { return peerRankInvalidDownload } @@ -409,11 +426,15 @@ func peerAddress(peer network.Peer) string { // refreshAvailablePeers reload the available peers from the network package, add new peers along with their // corresponding initial rank, and deletes peers that have been dropped by the network package. func (ps *peerSelector) refreshAvailablePeers() { - existingPeers := make(map[string]network.Peer) + existingPeers := make(map[network.PeerOption]map[string]bool) for _, pool := range ps.pools { for _, localPeer := range pool.peers { if peerAddress := peerAddress(localPeer.peer); peerAddress != "" { - existingPeers[peerAddress] = localPeer.peer + // Init the existingPeers map + if _, has := existingPeers[localPeer.class.peerClass]; !has { + existingPeers[localPeer.class.peerClass] = make(map[string]bool) + } + existingPeers[localPeer.class.peerClass][peerAddress] = true } } } @@ -425,12 +446,13 @@ func (ps *peerSelector) refreshAvailablePeers() { if peerAddress == "" { continue } - if _, has := existingPeers[peerAddress]; has { - delete(existingPeers, peerAddress) + if _, has := existingPeers[initClass.peerClass][peerAddress]; has { + // Setting to false instead of deleting the element to be safe against duplicate peer addresses. + existingPeers[initClass.peerClass][peerAddress] = false continue } // it's an entry which we did not have before. - sortNeeded = ps.addToPool(peer, initClass.initialRank, initClass, makeHistoricStatus(peerHistoryWindowSize)) || sortNeeded + sortNeeded = ps.addToPool(peer, initClass.initialRank, initClass, makeHistoricStatus(peerHistoryWindowSize, initClass)) || sortNeeded } } @@ -440,7 +462,7 @@ func (ps *peerSelector) refreshAvailablePeers() { for peerIdx := len(pool.peers) - 1; peerIdx >= 0; peerIdx-- { peer := pool.peers[peerIdx].peer if peerAddress := peerAddress(peer); peerAddress != "" { - if _, has := existingPeers[peerAddress]; has { + if toRemove, _ := existingPeers[pool.peers[peerIdx].class.peerClass][peerAddress]; toRemove { // need to be removed. pool.peers = append(pool.peers[:peerIdx], pool.peers[peerIdx+1:]...) } @@ -461,14 +483,16 @@ func (ps *peerSelector) refreshAvailablePeers() { // findPeer look into the peer pool and find the given peer. // The method returns the pool and peer indices if a peer was found, or (-1, -1) otherwise. -func (ps *peerSelector) findPeer(peer network.Peer) (poolIdx, peerIdx int) { - peerAddr := peerAddress(peer) - if peerAddr != "" { - for i, pool := range ps.pools { - for j, localPeerEntry := range pool.peers { - if peerAddress(localPeerEntry.peer) == peerAddr { - return i, j - } +func (ps *peerSelector) findPeer(psp *peerSelectorPeer) (poolIdx, peerIdx int) { + peerAddr := peerAddress(psp.Peer) + if peerAddr == "" { + return -1, -1 + } + for i, pool := range ps.pools { + for j, localPeerEntry := range pool.peers { + if peerAddress(localPeerEntry.peer) == peerAddr && + localPeerEntry.class.peerClass == psp.peerClass { + return i, j } } } diff --git a/catchup/peerSelector_test.go b/catchup/peerSelector_test.go index 170aba1f17..ed48558e2d 100644 --- a/catchup/peerSelector_test.go +++ b/catchup/peerSelector_test.go @@ -18,7 +18,6 @@ package catchup import ( "context" - "fmt" "net/http" "testing" "time" @@ -119,46 +118,50 @@ func TestPeerSelector(t *testing.T) { }), []peerClass{{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookArchivers}}, ) - peer, err := peerSelector.GetNextPeer() + psp, err := peerSelector.getNextPeer() + peer := psp.Peer require.NoError(t, err) require.Equal(t, "12345", peerAddress(peer)) // replace peer. peers = []network.Peer{&mockHTTPPeer{address: "54321"}} - peer, err = peerSelector.GetNextPeer() + psp, err = peerSelector.getNextPeer() + peer = psp.Peer require.NoError(t, err) require.Equal(t, "54321", peerAddress(peer)) // add another peer peers = []network.Peer{&mockHTTPPeer{address: "54321"}, &mockHTTPPeer{address: "abcde"}} - r1, r2 := peerSelector.RankPeer(peer, 5) + r1, r2 := peerSelector.RankPeer(psp, 5) require.True(t, r1 != r2) - peer, err = peerSelector.GetNextPeer() + psp, err = peerSelector.getNextPeer() + peer = psp.Peer require.NoError(t, err) require.Equal(t, "abcde", peerAddress(peer)) - r1, r2 = peerSelector.RankPeer(peer, 10) + r1, r2 = peerSelector.RankPeer(psp, 200) require.True(t, r1 != r2) - peer, err = peerSelector.GetNextPeer() + psp, err = peerSelector.getNextPeer() + peer = psp.Peer require.NoError(t, err) require.Equal(t, "54321", peerAddress(peer)) peers = []network.Peer{t} // include a non-peer object, to test the refreshAvailablePeers handling of empty addresses. - peer, err = peerSelector.GetNextPeer() + psp, err = peerSelector.getNextPeer() require.Equal(t, errPeerSelectorNoPeerPoolsAvailable, err) - require.Nil(t, peer) + require.Nil(t, psp) // create an empty entry ( even though the code won't let it happen ) peerSelector.pools = []peerPool{{rank: peerRankInitialFirstPriority}} - peer, err = peerSelector.GetNextPeer() + psp, err = peerSelector.getNextPeer() require.Equal(t, errPeerSelectorNoPeerPoolsAvailable, err) - require.Nil(t, peer) + require.Nil(t, psp) r1, r2 = peerSelector.RankPeer(nil, 10) require.False(t, r1 != r2) - r2, r2 = peerSelector.RankPeer(&mockHTTPPeer{address: "abc123"}, 10) + r2, r2 = peerSelector.RankPeer(&peerSelectorPeer{&mockHTTPPeer{address: "abc123"}, 1}, 10) require.False(t, r1 != r2) return @@ -181,14 +184,14 @@ func TestPeerDownloadRanking(t *testing.T) { }), []peerClass{{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookArchivers}, {initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookRelays}}, ) - archivalPeer, err := peerSelector.GetNextPeer() + archivalPeer, err := peerSelector.getNextPeer() require.NoError(t, err) require.Equal(t, downloadDurationToRank(500*time.Millisecond, lowBlockDownloadThreshold, highBlockDownloadThreshold, peerRank0LowBlockTime, peerRank0HighBlockTime), peerSelector.PeerDownloadDurationToRank(archivalPeer, 500*time.Millisecond)) peerSelector.RankPeer(archivalPeer, peerRankInvalidDownload) - archivalPeer, err = peerSelector.GetNextPeer() + archivalPeer, err = peerSelector.getNextPeer() require.NoError(t, err) require.Equal(t, downloadDurationToRank(500*time.Millisecond, lowBlockDownloadThreshold, highBlockDownloadThreshold, peerRank0LowBlockTime, peerRank0HighBlockTime), peerSelector.PeerDownloadDurationToRank(archivalPeer, 500*time.Millisecond)) @@ -196,21 +199,21 @@ func TestPeerDownloadRanking(t *testing.T) { peerSelector.RankPeer(archivalPeer, peerRankInvalidDownload) // and now test the relay peers - relayPeer, err := peerSelector.GetNextPeer() + relayPeer, err := peerSelector.getNextPeer() require.NoError(t, err) require.Equal(t, downloadDurationToRank(500*time.Millisecond, lowBlockDownloadThreshold, highBlockDownloadThreshold, peerRank1LowBlockTime, peerRank1HighBlockTime), peerSelector.PeerDownloadDurationToRank(relayPeer, 500*time.Millisecond)) peerSelector.RankPeer(relayPeer, peerRankInvalidDownload) - relayPeer, err = peerSelector.GetNextPeer() + relayPeer, err = peerSelector.getNextPeer() require.NoError(t, err) require.Equal(t, downloadDurationToRank(500*time.Millisecond, lowBlockDownloadThreshold, highBlockDownloadThreshold, peerRank1LowBlockTime, peerRank1HighBlockTime), peerSelector.PeerDownloadDurationToRank(relayPeer, 500*time.Millisecond)) peerSelector.RankPeer(relayPeer, peerRankInvalidDownload) - require.Equal(t, peerRankInvalidDownload, peerSelector.PeerDownloadDurationToRank(&mockHTTPPeer{address: "abc123"}, time.Millisecond)) + require.Equal(t, peerRankInvalidDownload, peerSelector.PeerDownloadDurationToRank(&peerSelectorPeer{mockHTTPPeer{address: "abc123"}, 0}, time.Millisecond)) } func TestFindMissingPeer(t *testing.T) { @@ -220,7 +223,7 @@ func TestFindMissingPeer(t *testing.T) { }), []peerClass{{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookArchivers}}, ) - poolIdx, peerIdx := peerSelector.findPeer(&mockHTTPPeer{address: "abcd"}) + poolIdx, peerIdx := peerSelector.findPeer(&peerSelectorPeer{mockHTTPPeer{address: "abcd"}, 0}) require.Equal(t, -1, poolIdx) require.Equal(t, -1, peerIdx) } @@ -246,7 +249,8 @@ func TestHistoricData(t *testing.T) { var counters [5]int for i := 0; i < 1000; i++ { - peer, getPeerErr := peerSelector.GetNextPeer() + psp, getPeerErr := peerSelector.getNextPeer() + peer := psp.Peer switch peer.(*mockHTTPPeer).address { case "a1": @@ -274,18 +278,13 @@ func TestHistoricData(t *testing.T) { case "a3": duration = time.Duration(100 * float64(time.Millisecond) * randVal) } - peerRank := peerSelector.PeerDownloadDurationToRank(peer, duration) - peerSelector.RankPeer(peer, peerRank) + peerRank := peerSelector.PeerDownloadDurationToRank(psp, duration) + peerSelector.RankPeer(psp, peerRank) } else { - peerSelector.RankPeer(peer, peerRankDownloadFailed) + peerSelector.RankPeer(psp, peerRankDownloadFailed) } } - fmt.Printf("a1: %d\n", counters[0]) - fmt.Printf("a2: %d\n", counters[1]) - fmt.Printf("a3: %d\n", counters[2]) - fmt.Printf("b1: %d\n", counters[3]) - fmt.Printf("b2: %d\n", counters[4]) require.GreaterOrEqual(t, counters[2], counters[1]) require.GreaterOrEqual(t, counters[2], counters[0]) require.Equal(t, counters[3], 0) @@ -313,7 +312,8 @@ func TestPeersDownloadFailed(t *testing.T) { var counters [5]int for i := 0; i < 1000; i++ { - peer, getPeerErr := peerSelector.GetNextPeer() + psp, getPeerErr := peerSelector.getNextPeer() + peer := psp.Peer switch peer.(*mockHTTPPeer).address { case "a1": @@ -335,21 +335,16 @@ func TestPeersDownloadFailed(t *testing.T) { randVal = randVal + 1 if randVal < 1.98 { duration := time.Duration(100 * float64(time.Millisecond) * randVal) - peerRank := peerSelector.PeerDownloadDurationToRank(peer, duration) - peerSelector.RankPeer(peer, peerRank) + peerRank := peerSelector.PeerDownloadDurationToRank(psp, duration) + peerSelector.RankPeer(psp, peerRank) } else { - peerSelector.RankPeer(peer, peerRankDownloadFailed) + peerSelector.RankPeer(psp, peerRankDownloadFailed) } } else { - peerSelector.RankPeer(peer, peerRankDownloadFailed) + peerSelector.RankPeer(psp, peerRankDownloadFailed) } } - fmt.Printf("a1: %d\n", counters[0]) - fmt.Printf("a2: %d\n", counters[1]) - fmt.Printf("a3: %d\n", counters[2]) - fmt.Printf("b1: %d\n", counters[3]) - fmt.Printf("b2: %d\n", counters[4]) require.GreaterOrEqual(t, counters[3], 20) require.GreaterOrEqual(t, counters[4], 20) @@ -360,7 +355,7 @@ func TestPeersDownloadFailed(t *testing.T) { require.True(t, b1orb2) require.Equal(t, peerSelector.pools[1].rank, 900) require.Equal(t, len(peerSelector.pools[1].peers), 3) - } else { + } else { // len(pools) == 3 b1orb2 := peerAddress(peerSelector.pools[1].peers[0].peer) == "b1" || peerAddress(peerSelector.pools[1].peers[0].peer) == "b2" require.True(t, b1orb2) require.Equal(t, peerSelector.pools[2].rank, 900) @@ -392,7 +387,8 @@ func TestPenalty(t *testing.T) { var counters [5]int for i := 0; i < 1000; i++ { - peer, getPeerErr := peerSelector.GetNextPeer() + psp, getPeerErr := peerSelector.getNextPeer() + peer := psp.Peer switch peer.(*mockHTTPPeer).address { case "a1": counters[0]++ @@ -416,15 +412,10 @@ func TestPenalty(t *testing.T) { case "a3": duration = time.Duration(100 * float64(time.Millisecond)) } - peerRank := peerSelector.PeerDownloadDurationToRank(peer, duration) - peerSelector.RankPeer(peer, peerRank) + peerRank := peerSelector.PeerDownloadDurationToRank(psp, duration) + peerSelector.RankPeer(psp, peerRank) } - fmt.Printf("a1: %d\n", counters[0]) - fmt.Printf("a2: %d\n", counters[1]) - fmt.Printf("a3: %d\n", counters[2]) - fmt.Printf("b1: %d\n", counters[3]) - fmt.Printf("b2: %d\n", counters[4]) require.GreaterOrEqual(t, counters[1], 50) require.GreaterOrEqual(t, counters[2], 2*counters[1]) require.Equal(t, counters[3], 0) @@ -437,7 +428,7 @@ func TestPeerDownloadDurationToRank(t *testing.T) { peers1 := []network.Peer{&mockHTTPPeer{address: "a1"}, &mockHTTPPeer{address: "a2"}, &mockHTTPPeer{address: "a3"}} peers2 := []network.Peer{&mockHTTPPeer{address: "b1"}, &mockHTTPPeer{address: "b2"}} peers3 := []network.Peer{&mockHTTPPeer{address: "c1"}, &mockHTTPPeer{address: "c2"}} - peers4 := []network.Peer{&mockHTTPPeer{address: "d1"}, &mockHTTPPeer{address: "d2"}} + peers4 := []network.Peer{&mockHTTPPeer{address: "d1"}, &mockHTTPPeer{address: "b2"}} peerSelector := makePeerSelector( makePeersRetrieverStub(func(options ...network.PeerOption) (peers []network.Peer) { @@ -459,17 +450,17 @@ func TestPeerDownloadDurationToRank(t *testing.T) { {initialRank: peerRankInitialFourthPriority, peerClass: network.PeersConnectedIn}}, ) - _, err := peerSelector.GetNextPeer() + _, err := peerSelector.getNextPeer() require.NoError(t, err) require.Equal(t, downloadDurationToRank(500*time.Millisecond, lowBlockDownloadThreshold, highBlockDownloadThreshold, peerRank0LowBlockTime, peerRank0HighBlockTime), - peerSelector.PeerDownloadDurationToRank(peers1[0], 500*time.Millisecond)) + peerSelector.PeerDownloadDurationToRank(&peerSelectorPeer{peers1[0], network.PeersPhonebookArchivers}, 500*time.Millisecond)) require.Equal(t, downloadDurationToRank(500*time.Millisecond, lowBlockDownloadThreshold, highBlockDownloadThreshold, peerRank1LowBlockTime, peerRank1HighBlockTime), - peerSelector.PeerDownloadDurationToRank(peers2[0], 500*time.Millisecond)) + peerSelector.PeerDownloadDurationToRank(&peerSelectorPeer{peers2[0], network.PeersPhonebookRelays}, 500*time.Millisecond)) require.Equal(t, downloadDurationToRank(500*time.Millisecond, lowBlockDownloadThreshold, highBlockDownloadThreshold, peerRank2LowBlockTime, peerRank2HighBlockTime), - peerSelector.PeerDownloadDurationToRank(peers3[0], 500*time.Millisecond)) + peerSelector.PeerDownloadDurationToRank(&peerSelectorPeer{peers3[0], network.PeersConnectedOut}, 500*time.Millisecond)) require.Equal(t, downloadDurationToRank(500*time.Millisecond, lowBlockDownloadThreshold, highBlockDownloadThreshold, peerRank3LowBlockTime, peerRank3HighBlockTime), - peerSelector.PeerDownloadDurationToRank(peers4[0], 500*time.Millisecond)) + peerSelector.PeerDownloadDurationToRank(&peerSelectorPeer{peers4[0], network.PeersConnectedIn}, 500*time.Millisecond)) } func TestLowerUpperBounds(t *testing.T) { @@ -491,7 +482,7 @@ func TestLowerUpperBounds(t *testing.T) { func TestFullResetRequestPenalty(t *testing.T) { class := peerClass{initialRank: 10, peerClass: network.PeersPhonebookArchivers} - hs := makeHistoricStatus(10) + hs := makeHistoricStatus(10, class) hs.push(5, 1, class) require.Equal(t, 1, len(hs.requestGaps)) diff --git a/catchup/service.go b/catchup/service.go index 7d34b4a7cc..b7587f9792 100644 --- a/catchup/service.go +++ b/catchup/service.go @@ -208,7 +208,8 @@ func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool, return false } - peer, getPeerErr := peerSelector.GetNextPeer() + psp, getPeerErr := peerSelector.getNextPeer() + peer := psp.Peer if getPeerErr != nil { s.log.Debugf("fetchAndWrite: was unable to obtain a peer to retrieve the block from") break @@ -219,7 +220,7 @@ func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool, if err != nil { s.log.Debugf("fetchAndWrite(%v): Could not fetch: %v (attempt %d)", r, err, i) - peerSelector.RankPeer(peer, peerRankDownloadFailed) + peerSelector.RankPeer(psp, peerRankDownloadFailed) // we've just failed to retrieve a block; wait until the previous block is fetched before trying again // to avoid the usecase where the first block doesn't exists and we're making many requests down the chain // for no reason. @@ -245,7 +246,7 @@ func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool, // Check that the block's contents match the block header (necessary with an untrusted block because b.Hash() only hashes the header) if s.cfg.CatchupVerifyPaysetHash() { if !block.ContentsMatchHeader() { - peerSelector.RankPeer(peer, peerRankInvalidDownload) + peerSelector.RankPeer(psp, peerRankInvalidDownload) // Check if this mismatch is due to an unsupported protocol version if _, ok := config.Consensus[block.BlockHeader.CurrentProtocol]; !ok { s.log.Errorf("fetchAndWrite(%v): unsupported protocol version detected: '%v'", r, block.BlockHeader.CurrentProtocol) @@ -274,13 +275,13 @@ func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool, err = s.auth.Authenticate(block, cert) if err != nil { s.log.Warnf("fetchAndWrite(%v): cert did not authenticate block (attempt %d): %v", r, i, err) - peerSelector.RankPeer(peer, peerRankInvalidDownload) + peerSelector.RankPeer(psp, peerRankInvalidDownload) continue // retry the fetch } } - peerRank := peerSelector.PeerDownloadDurationToRank(peer, blockDownloadDuration) - r1, r2 := peerSelector.RankPeer(peer, peerRank) + peerRank := peerSelector.PeerDownloadDurationToRank(psp, blockDownloadDuration) + r1, r2 := peerSelector.RankPeer(psp, peerRank) s.log.Debugf("fetchAndWrite(%d): ranked peer with %d from %d to %d", r, peerRank, r1, r2) // Write to ledger, noting that ledger writes must be in order @@ -383,7 +384,7 @@ func (s *Service) pipelinedFetch(seedLookback uint64) { peerSelector := s.createPeerSelector(true) - if _, err := peerSelector.GetNextPeer(); err == errPeerSelectorNoPeerPoolsAvailable { + if _, err := peerSelector.getNextPeer(); err == errPeerSelectorNoPeerPoolsAvailable { s.log.Debugf("pipelinedFetch: was unable to obtain a peer to retrieve the block from") return } @@ -602,7 +603,8 @@ func (s *Service) fetchRound(cert agreement.Certificate, verifier *agreement.Asy blockHash := bookkeeping.BlockHash(cert.Proposal.BlockDigest) // semantic digest (i.e., hash of the block header), not byte-for-byte digest peerSelector := s.createPeerSelector(false) for s.ledger.LastRound() < cert.Round { - peer, getPeerErr := peerSelector.GetNextPeer() + psp, getPeerErr := peerSelector.getNextPeer() + peer := psp.Peer if getPeerErr != nil { s.log.Debugf("fetchRound: was unable to obtain a peer to retrieve the block from") s.net.RequestConnectOutgoing(true, s.ctx.Done()) @@ -620,7 +622,7 @@ func (s *Service) fetchRound(cert agreement.Certificate, verifier *agreement.Asy default: } logging.Base().Warnf("fetchRound could not acquire block, fetcher errored out: %v", err) - peerSelector.RankPeer(peer, peerRankDownloadFailed) + peerSelector.RankPeer(psp, peerRankDownloadFailed) continue } @@ -630,7 +632,7 @@ func (s *Service) fetchRound(cert agreement.Certificate, verifier *agreement.Asy } // Otherwise, fetcher gave us the wrong block logging.Base().Warnf("fetcher gave us bad/wrong block (for round %d): fetched hash %v; want hash %v", cert.Round, block.Hash(), blockHash) - peerSelector.RankPeer(peer, peerRankInvalidDownload) + peerSelector.RankPeer(psp, peerRankInvalidDownload) // As a failsafe, if the cert we fetched is valid but for the wrong block, panic as loudly as possible if cert.Round == fetchedCert.Round && diff --git a/test/e2e-go/features/catchup/catchpointCatchup_test.go b/test/e2e-go/features/catchup/catchpointCatchup_test.go index 84b4e2f6c9..54e15b0b07 100644 --- a/test/e2e-go/features/catchup/catchpointCatchup_test.go +++ b/test/e2e-go/features/catchup/catchpointCatchup_test.go @@ -119,9 +119,6 @@ func TestBasicCatchpointCatchup(t *testing.T) { errorsCollector := nodeExitErrorCollector{t: fixtures.SynchronizedTest(t)} defer errorsCollector.Print() - // Give the second node (which starts up last) all the stake so that its proposal always has better credentials, - // and so that its proposal isn't dropped. Otherwise the test burns 17s to recover. We don't care about stake - // distribution for catchup so this is fine. fixture.SetupNoStart(t, filepath.Join("nettemplates", "CatchpointCatchupTestNetwork.json")) // Get primary node @@ -217,8 +214,8 @@ func TestBasicCatchpointCatchup(t *testing.T) { log.Infof("primary node latest catchpoint - %s!\n", *primaryNodeStatus.LastCatchpoint) secondNodeRestClient.Catchup(*primaryNodeStatus.LastCatchpoint) - currentRound = uint64(36) - targetRound = uint64(37) + currentRound = primaryNodeStatus.LastRound + targetRound = currentRound + 1 log.Infof("Second node catching up to round 36") for { err = fixture.ClientWaitForRound(secondNodeRestClient, currentRound, 10*time.Second) From d8b73f87d004df78bf0b6896e87d24c32c44ffd8 Mon Sep 17 00:00:00 2001 From: algonautshant Date: Tue, 15 Jun 2021 02:06:04 -0400 Subject: [PATCH 2/4] Added tests, exponential increase of download failure impact, local functions. --- catchup/catchpointService.go | 30 +++---- catchup/peerSelector.go | 10 +-- catchup/peerSelector_test.go | 154 ++++++++++++++++++++++++++++------- catchup/service.go | 14 ++-- 4 files changed, 153 insertions(+), 55 deletions(-) diff --git a/catchup/catchpointService.go b/catchup/catchpointService.go index a682a4a428..fa9639a986 100644 --- a/catchup/catchpointService.go +++ b/catchup/catchpointService.go @@ -294,9 +294,9 @@ func (cs *CatchpointCatchupService) processStageLedgerDownload() (err error) { break } // failed to build the merkle trie for the above catchpoint file. - peerSelector.RankPeer(psp, peerRankInvalidDownload) + peerSelector.rankPeer(psp, peerRankInvalidDownload) } else { - peerSelector.RankPeer(psp, peerRankDownloadFailed) + peerSelector.rankPeer(psp, peerRankDownloadFailed) } // instead of testing for err == cs.ctx.Err() , we'll check on the context itself. @@ -365,7 +365,7 @@ func (cs *CatchpointCatchupService) processStageLastestBlockDownload() (err erro if attemptsCount <= cs.config.CatchupBlockDownloadRetryAttempts { // try again. blk = nil - cs.blocksDownloadPeerSelector.RankPeer(psp, peerRankInvalidDownload) + cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankInvalidDownload) continue } return cs.abort(fmt.Errorf("processStageLastestBlockDownload: unsupported protocol version detected: '%v'", blk.BlockHeader.CurrentProtocol)) @@ -377,7 +377,7 @@ func (cs *CatchpointCatchupService) processStageLastestBlockDownload() (err erro if attemptsCount <= cs.config.CatchupBlockDownloadRetryAttempts { // try again. blk = nil - cs.blocksDownloadPeerSelector.RankPeer(psp, peerRankInvalidDownload) + cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankInvalidDownload) continue } return cs.abort(fmt.Errorf("processStageLastestBlockDownload: genesis hash mismatches : genesis hash on genesis.json file is %v while genesis hash of downloaded block is %v", cs.ledger.GenesisHash(), blk.GenesisHash())) @@ -390,7 +390,7 @@ func (cs *CatchpointCatchupService) processStageLastestBlockDownload() (err erro if attemptsCount <= cs.config.CatchupBlockDownloadRetryAttempts { // try again. blk = nil - cs.blocksDownloadPeerSelector.RankPeer(psp, peerRankInvalidDownload) + cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankInvalidDownload) continue } return cs.abort(fmt.Errorf("processStageLastestBlockDownload: downloaded block content does not match downloaded block header")) @@ -406,14 +406,14 @@ func (cs *CatchpointCatchupService) processStageLastestBlockDownload() (err erro // try again. blk = nil cs.log.Infof("processStageLastestBlockDownload: block %d verification against catchpoint failed, another attempt will be made; err = %v", blockRound, err) - cs.blocksDownloadPeerSelector.RankPeer(psp, peerRankInvalidDownload) + cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankInvalidDownload) continue } return cs.abort(fmt.Errorf("processStageLastestBlockDownload failed when calling VerifyCatchpoint : %v", err)) } // give a rank to the download, as the download was successful. - peerRank := cs.blocksDownloadPeerSelector.PeerDownloadDurationToRank(psp, blockDownloadDuration) - cs.blocksDownloadPeerSelector.RankPeer(psp, peerRank) + peerRank := cs.blocksDownloadPeerSelector.peerDownloadDurationToRank(psp, blockDownloadDuration) + cs.blocksDownloadPeerSelector.rankPeer(psp, peerRank) err = cs.ledgerAccessor.StoreBalancesRound(cs.ctx, blk) if err != nil { @@ -516,7 +516,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) { // not identical, retry download. cs.log.Warnf("processStageBlocksDownload downloaded block(%d) did not match it's successor(%d) block hash %v != %v", blk.Round(), prevBlock.Round(), blk.Hash(), prevBlock.BlockHeader.Branch) cs.updateBlockRetrievalStatistics(-1, 0) - cs.blocksDownloadPeerSelector.RankPeer(psp, peerRankInvalidDownload) + cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankInvalidDownload) if retryCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) { // try again. retryCount++ @@ -529,7 +529,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) { if _, ok := config.Consensus[blk.BlockHeader.CurrentProtocol]; !ok { cs.log.Warnf("processStageBlocksDownload: unsupported protocol version detected: '%v'", blk.BlockHeader.CurrentProtocol) cs.updateBlockRetrievalStatistics(-1, 0) - cs.blocksDownloadPeerSelector.RankPeer(psp, peerRankInvalidDownload) + cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankInvalidDownload) if retryCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) { // try again. retryCount++ @@ -542,7 +542,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) { if !blk.ContentsMatchHeader() { cs.log.Warnf("processStageBlocksDownload: downloaded block content does not match downloaded block header") // try again. - cs.blocksDownloadPeerSelector.RankPeer(psp, peerRankInvalidDownload) + cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankInvalidDownload) cs.updateBlockRetrievalStatistics(-1, 0) if retryCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) { // try again. @@ -553,8 +553,8 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) { } cs.updateBlockRetrievalStatistics(0, 1) - peerRank := cs.blocksDownloadPeerSelector.PeerDownloadDurationToRank(psp, blockDownloadDuration) - cs.blocksDownloadPeerSelector.RankPeer(psp, peerRank) + peerRank := cs.blocksDownloadPeerSelector.peerDownloadDurationToRank(psp, blockDownloadDuration) + cs.blocksDownloadPeerSelector.rankPeer(psp, peerRank) // all good, persist and move on. err = cs.ledgerAccessor.StoreBlock(cs.ctx, blk) @@ -593,7 +593,7 @@ func (cs *CatchpointCatchupService) fetchBlock(round basics.Round, retryCount ui httpPeer, validPeer := peer.(network.HTTPPeer) if !validPeer { cs.log.Warnf("fetchBlock: non-HTTP peer was provided by the peer selector") - cs.blocksDownloadPeerSelector.RankPeer(psp, peerRankInvalidDownload) + cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankInvalidDownload) if retryCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) { // try again. return nil, time.Duration(0), psp, false, nil @@ -609,7 +609,7 @@ func (cs *CatchpointCatchupService) fetchBlock(round basics.Round, retryCount ui if retryCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) { // try again. cs.log.Infof("Failed to download block %d on attempt %d out of %d. %v", round, retryCount, cs.config.CatchupBlockDownloadRetryAttempts, err) - cs.blocksDownloadPeerSelector.RankPeer(psp, peerRankDownloadFailed) + cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankDownloadFailed) return nil, time.Duration(0), psp, false, nil } return nil, time.Duration(0), psp, true, cs.abort(fmt.Errorf("fetchBlock failed after multiple blocks download attempts")) diff --git a/catchup/peerSelector.go b/catchup/peerSelector.go index b38d05aade..ac96b823c8 100644 --- a/catchup/peerSelector.go +++ b/catchup/peerSelector.go @@ -233,7 +233,7 @@ func (hs *historicStats) push(value int, counter uint64, class peerClass) (avera // download, the value added to rankSum will // increase at an increasing rate to evict the peer // from the class sooner. - value = upperBound(class) * hs.downloadFailures + value = upperBound(class) * int(math.Exp2(float64(hs.downloadFailures))) } else { if hs.downloadFailures > 0 { hs.downloadFailures-- @@ -298,10 +298,10 @@ func (ps *peerSelector) getNextPeer() (psp *peerSelectorPeer, err error) { return nil, errPeerSelectorNoPeerPoolsAvailable } -// RankPeer ranks a given peer. +// rankPeer ranks a given peer. // return the old value and the new updated value. // updated value could be different from the input rank. -func (ps *peerSelector) RankPeer(psp *peerSelectorPeer, rank int) (int, int) { +func (ps *peerSelector) rankPeer(psp *peerSelectorPeer, rank int) (int, int) { if psp == nil { return -1, -1 } @@ -367,8 +367,8 @@ func (ps *peerSelector) RankPeer(psp *peerSelectorPeer, rank int) (int, int) { return initialRank, rank } -// PeerDownloadDurationToRank calculates the rank for a peer given a peer and the block download time. -func (ps *peerSelector) PeerDownloadDurationToRank(psp *peerSelectorPeer, blockDownloadDuration time.Duration) (rank int) { +// peerDownloadDurationToRank calculates the rank for a peer given a peer and the block download time. +func (ps *peerSelector) peerDownloadDurationToRank(psp *peerSelectorPeer, blockDownloadDuration time.Duration) (rank int) { ps.mu.Lock() defer ps.mu.Unlock() poolIdx, peerIdx := ps.findPeer(psp) diff --git a/catchup/peerSelector_test.go b/catchup/peerSelector_test.go index ed48558e2d..fa04ddb087 100644 --- a/catchup/peerSelector_test.go +++ b/catchup/peerSelector_test.go @@ -132,7 +132,7 @@ func TestPeerSelector(t *testing.T) { // add another peer peers = []network.Peer{&mockHTTPPeer{address: "54321"}, &mockHTTPPeer{address: "abcde"}} - r1, r2 := peerSelector.RankPeer(psp, 5) + r1, r2 := peerSelector.rankPeer(psp, 5) require.True(t, r1 != r2) psp, err = peerSelector.getNextPeer() @@ -140,7 +140,7 @@ func TestPeerSelector(t *testing.T) { require.NoError(t, err) require.Equal(t, "abcde", peerAddress(peer)) - r1, r2 = peerSelector.RankPeer(psp, 200) + r1, r2 = peerSelector.rankPeer(psp, 200) require.True(t, r1 != r2) psp, err = peerSelector.getNextPeer() @@ -159,9 +159,9 @@ func TestPeerSelector(t *testing.T) { require.Equal(t, errPeerSelectorNoPeerPoolsAvailable, err) require.Nil(t, psp) - r1, r2 = peerSelector.RankPeer(nil, 10) + r1, r2 = peerSelector.rankPeer(nil, 10) require.False(t, r1 != r2) - r2, r2 = peerSelector.RankPeer(&peerSelectorPeer{&mockHTTPPeer{address: "abc123"}, 1}, 10) + r2, r2 = peerSelector.rankPeer(&peerSelectorPeer{&mockHTTPPeer{address: "abc123"}, 1}, 10) require.False(t, r1 != r2) return @@ -187,33 +187,33 @@ func TestPeerDownloadRanking(t *testing.T) { archivalPeer, err := peerSelector.getNextPeer() require.NoError(t, err) - require.Equal(t, downloadDurationToRank(500*time.Millisecond, lowBlockDownloadThreshold, highBlockDownloadThreshold, peerRank0LowBlockTime, peerRank0HighBlockTime), peerSelector.PeerDownloadDurationToRank(archivalPeer, 500*time.Millisecond)) + require.Equal(t, downloadDurationToRank(500*time.Millisecond, lowBlockDownloadThreshold, highBlockDownloadThreshold, peerRank0LowBlockTime, peerRank0HighBlockTime), peerSelector.peerDownloadDurationToRank(archivalPeer, 500*time.Millisecond)) - peerSelector.RankPeer(archivalPeer, peerRankInvalidDownload) + peerSelector.rankPeer(archivalPeer, peerRankInvalidDownload) archivalPeer, err = peerSelector.getNextPeer() require.NoError(t, err) - require.Equal(t, downloadDurationToRank(500*time.Millisecond, lowBlockDownloadThreshold, highBlockDownloadThreshold, peerRank0LowBlockTime, peerRank0HighBlockTime), peerSelector.PeerDownloadDurationToRank(archivalPeer, 500*time.Millisecond)) + require.Equal(t, downloadDurationToRank(500*time.Millisecond, lowBlockDownloadThreshold, highBlockDownloadThreshold, peerRank0LowBlockTime, peerRank0HighBlockTime), peerSelector.peerDownloadDurationToRank(archivalPeer, 500*time.Millisecond)) - peerSelector.RankPeer(archivalPeer, peerRankInvalidDownload) + peerSelector.rankPeer(archivalPeer, peerRankInvalidDownload) // and now test the relay peers relayPeer, err := peerSelector.getNextPeer() require.NoError(t, err) - require.Equal(t, downloadDurationToRank(500*time.Millisecond, lowBlockDownloadThreshold, highBlockDownloadThreshold, peerRank1LowBlockTime, peerRank1HighBlockTime), peerSelector.PeerDownloadDurationToRank(relayPeer, 500*time.Millisecond)) + require.Equal(t, downloadDurationToRank(500*time.Millisecond, lowBlockDownloadThreshold, highBlockDownloadThreshold, peerRank1LowBlockTime, peerRank1HighBlockTime), peerSelector.peerDownloadDurationToRank(relayPeer, 500*time.Millisecond)) - peerSelector.RankPeer(relayPeer, peerRankInvalidDownload) + peerSelector.rankPeer(relayPeer, peerRankInvalidDownload) relayPeer, err = peerSelector.getNextPeer() require.NoError(t, err) - require.Equal(t, downloadDurationToRank(500*time.Millisecond, lowBlockDownloadThreshold, highBlockDownloadThreshold, peerRank1LowBlockTime, peerRank1HighBlockTime), peerSelector.PeerDownloadDurationToRank(relayPeer, 500*time.Millisecond)) + require.Equal(t, downloadDurationToRank(500*time.Millisecond, lowBlockDownloadThreshold, highBlockDownloadThreshold, peerRank1LowBlockTime, peerRank1HighBlockTime), peerSelector.peerDownloadDurationToRank(relayPeer, 500*time.Millisecond)) - peerSelector.RankPeer(relayPeer, peerRankInvalidDownload) + peerSelector.rankPeer(relayPeer, peerRankInvalidDownload) - require.Equal(t, peerRankInvalidDownload, peerSelector.PeerDownloadDurationToRank(&peerSelectorPeer{mockHTTPPeer{address: "abc123"}, 0}, time.Millisecond)) + require.Equal(t, peerRankInvalidDownload, peerSelector.peerDownloadDurationToRank(&peerSelectorPeer{mockHTTPPeer{address: "abc123"}, 0}, time.Millisecond)) } func TestFindMissingPeer(t *testing.T) { @@ -278,10 +278,10 @@ func TestHistoricData(t *testing.T) { case "a3": duration = time.Duration(100 * float64(time.Millisecond) * randVal) } - peerRank := peerSelector.PeerDownloadDurationToRank(psp, duration) - peerSelector.RankPeer(psp, peerRank) + peerRank := peerSelector.peerDownloadDurationToRank(psp, duration) + peerSelector.rankPeer(psp, peerRank) } else { - peerSelector.RankPeer(psp, peerRankDownloadFailed) + peerSelector.rankPeer(psp, peerRankDownloadFailed) } } @@ -335,13 +335,13 @@ func TestPeersDownloadFailed(t *testing.T) { randVal = randVal + 1 if randVal < 1.98 { duration := time.Duration(100 * float64(time.Millisecond) * randVal) - peerRank := peerSelector.PeerDownloadDurationToRank(psp, duration) - peerSelector.RankPeer(psp, peerRank) + peerRank := peerSelector.peerDownloadDurationToRank(psp, duration) + peerSelector.rankPeer(psp, peerRank) } else { - peerSelector.RankPeer(psp, peerRankDownloadFailed) + peerSelector.rankPeer(psp, peerRankDownloadFailed) } } else { - peerSelector.RankPeer(psp, peerRankDownloadFailed) + peerSelector.rankPeer(psp, peerRankDownloadFailed) } } @@ -412,8 +412,8 @@ func TestPenalty(t *testing.T) { case "a3": duration = time.Duration(100 * float64(time.Millisecond)) } - peerRank := peerSelector.PeerDownloadDurationToRank(psp, duration) - peerSelector.RankPeer(psp, peerRank) + peerRank := peerSelector.peerDownloadDurationToRank(psp, duration) + peerSelector.rankPeer(psp, peerRank) } require.GreaterOrEqual(t, counters[1], 50) @@ -422,8 +422,8 @@ func TestPenalty(t *testing.T) { require.Equal(t, counters[4], 0) } -// TestPeerDownloadDurationToRank tests all the cases handled by PeerDownloadDurationToRank -func TestPeerDownloadDurationToRank(t *testing.T) { +// TestpeerDownloadDurationToRank tests all the cases handled by peerDownloadDurationToRank +func TestpeerDownloadDurationToRank(t *testing.T) { peers1 := []network.Peer{&mockHTTPPeer{address: "a1"}, &mockHTTPPeer{address: "a2"}, &mockHTTPPeer{address: "a3"}} peers2 := []network.Peer{&mockHTTPPeer{address: "b1"}, &mockHTTPPeer{address: "b2"}} @@ -454,13 +454,13 @@ func TestPeerDownloadDurationToRank(t *testing.T) { require.NoError(t, err) require.Equal(t, downloadDurationToRank(500*time.Millisecond, lowBlockDownloadThreshold, highBlockDownloadThreshold, peerRank0LowBlockTime, peerRank0HighBlockTime), - peerSelector.PeerDownloadDurationToRank(&peerSelectorPeer{peers1[0], network.PeersPhonebookArchivers}, 500*time.Millisecond)) + peerSelector.peerDownloadDurationToRank(&peerSelectorPeer{peers1[0], network.PeersPhonebookArchivers}, 500*time.Millisecond)) require.Equal(t, downloadDurationToRank(500*time.Millisecond, lowBlockDownloadThreshold, highBlockDownloadThreshold, peerRank1LowBlockTime, peerRank1HighBlockTime), - peerSelector.PeerDownloadDurationToRank(&peerSelectorPeer{peers2[0], network.PeersPhonebookRelays}, 500*time.Millisecond)) + peerSelector.peerDownloadDurationToRank(&peerSelectorPeer{peers2[0], network.PeersPhonebookRelays}, 500*time.Millisecond)) require.Equal(t, downloadDurationToRank(500*time.Millisecond, lowBlockDownloadThreshold, highBlockDownloadThreshold, peerRank2LowBlockTime, peerRank2HighBlockTime), - peerSelector.PeerDownloadDurationToRank(&peerSelectorPeer{peers3[0], network.PeersConnectedOut}, 500*time.Millisecond)) + peerSelector.peerDownloadDurationToRank(&peerSelectorPeer{peers3[0], network.PeersConnectedOut}, 500*time.Millisecond)) require.Equal(t, downloadDurationToRank(500*time.Millisecond, lowBlockDownloadThreshold, highBlockDownloadThreshold, peerRank3LowBlockTime, peerRank3HighBlockTime), - peerSelector.PeerDownloadDurationToRank(&peerSelectorPeer{peers4[0], network.PeersConnectedIn}, 500*time.Millisecond)) + peerSelector.peerDownloadDurationToRank(&peerSelectorPeer{peers4[0], network.PeersConnectedIn}, 500*time.Millisecond)) } func TestLowerUpperBounds(t *testing.T) { @@ -489,3 +489,101 @@ func TestFullResetRequestPenalty(t *testing.T) { hs.resetRequestPenalty(0, 0, class) require.Equal(t, 0, len(hs.requestGaps)) } + +// TestClassUpperBound makes sure the peer rank does not exceed the class upper bound +// This was a bug where the resetRequestPenalty was not bounding the returned rank, and was having download failures. +// Initializing rankSamples to 0 makes this works, since the dropped value subtracts 0 from rankSum. +func TestClassUpperBound(t *testing.T) { + + peers1 := []network.Peer{&mockHTTPPeer{address: "a1"}, &mockHTTPPeer{address: "a2"}} + pClass := peerClass{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookArchivers} + peerSelector := makePeerSelector( + makePeersRetrieverStub(func(options ...network.PeerOption) (peers []network.Peer) { + for _, opt := range options { + if opt == network.PeersPhonebookArchivers { + peers = append(peers, peers1...) + } + } + return + }), []peerClass{pClass}) + + _, err := peerSelector.getNextPeer() + require.NoError(t, err) + for i := 0; i < 200; i++ { + psp, err := peerSelector.getNextPeer() + require.NoError(t, err) + if i < 6 { + peerSelector.rankPeer(psp, peerRankDownloadFailed) + } else { + peerSelector.rankPeer(psp, upperBound(pClass)) + } + for _, pool := range peerSelector.pools { + require.LessOrEqual(t, pool.rank, upperBound(pClass)) + } + } +} + +// TestClassLowerBound makes sure the peer rank does not go under the class lower bound +// This was a bug where the resetRequestPenalty was not bounding the returned rank, and the rankSum was not +// initialized to give the average of class.initialRank +func TestClassLowerBound(t *testing.T) { + + peers1 := []network.Peer{&mockHTTPPeer{address: "a1"}, &mockHTTPPeer{address: "a2"}} + pClass := peerClass{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookArchivers} + peerSelector := makePeerSelector( + makePeersRetrieverStub(func(options ...network.PeerOption) (peers []network.Peer) { + for _, opt := range options { + if opt == network.PeersPhonebookArchivers { + peers = append(peers, peers1...) + } + } + return + }), []peerClass{pClass}) + + _, err := peerSelector.getNextPeer() + require.NoError(t, err) + for i := 0; i < 10; i++ { + psp, err := peerSelector.getNextPeer() + require.NoError(t, err) + peerSelector.rankPeer(psp, lowerBound(pClass)) + + for _, pool := range peerSelector.pools { + require.GreaterOrEqual(t, pool.rank, pool.peers[0].class.initialRank) + } + } +} + +// TestEviction tests that the peer is evicted after several download failures, and it handles same address for different peer classes +func TestEvictionAndUpgrade(t *testing.T) { + + peers1 := []network.Peer{&mockHTTPPeer{address: "a1"}} + peers2 := []network.Peer{&mockHTTPPeer{address: "a1"}} + + peerSelector := makePeerSelector( + makePeersRetrieverStub(func(options ...network.PeerOption) (peers []network.Peer) { + for _, opt := range options { + if opt == network.PeersPhonebookArchivers { + peers = append(peers, peers1...) + } else { + peers = append(peers, peers2...) + } + } + return + }), []peerClass{{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookArchivers}, + {initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookRelays}}, + ) + + _, err := peerSelector.getNextPeer() + require.NoError(t, err) + for i := 0; i < 10; i++ { + if peerSelector.pools[len(peerSelector.pools)-1].rank == 900 { + require.Equal(t, 6, i) + break + } + psp, err := peerSelector.getNextPeer() + require.NoError(t, err) + peerSelector.rankPeer(psp, peerRankDownloadFailed) + } + psp, err := peerSelector.getNextPeer() + require.Equal(t, psp.peerClass, network.PeersPhonebookRelays) +} diff --git a/catchup/service.go b/catchup/service.go index b7587f9792..7f5de66f9b 100644 --- a/catchup/service.go +++ b/catchup/service.go @@ -220,7 +220,7 @@ func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool, if err != nil { s.log.Debugf("fetchAndWrite(%v): Could not fetch: %v (attempt %d)", r, err, i) - peerSelector.RankPeer(psp, peerRankDownloadFailed) + peerSelector.rankPeer(psp, peerRankDownloadFailed) // we've just failed to retrieve a block; wait until the previous block is fetched before trying again // to avoid the usecase where the first block doesn't exists and we're making many requests down the chain // for no reason. @@ -246,7 +246,7 @@ func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool, // Check that the block's contents match the block header (necessary with an untrusted block because b.Hash() only hashes the header) if s.cfg.CatchupVerifyPaysetHash() { if !block.ContentsMatchHeader() { - peerSelector.RankPeer(psp, peerRankInvalidDownload) + peerSelector.rankPeer(psp, peerRankInvalidDownload) // Check if this mismatch is due to an unsupported protocol version if _, ok := config.Consensus[block.BlockHeader.CurrentProtocol]; !ok { s.log.Errorf("fetchAndWrite(%v): unsupported protocol version detected: '%v'", r, block.BlockHeader.CurrentProtocol) @@ -275,13 +275,13 @@ func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool, err = s.auth.Authenticate(block, cert) if err != nil { s.log.Warnf("fetchAndWrite(%v): cert did not authenticate block (attempt %d): %v", r, i, err) - peerSelector.RankPeer(psp, peerRankInvalidDownload) + peerSelector.rankPeer(psp, peerRankInvalidDownload) continue // retry the fetch } } - peerRank := peerSelector.PeerDownloadDurationToRank(psp, blockDownloadDuration) - r1, r2 := peerSelector.RankPeer(psp, peerRank) + peerRank := peerSelector.peerDownloadDurationToRank(psp, blockDownloadDuration) + r1, r2 := peerSelector.rankPeer(psp, peerRank) s.log.Debugf("fetchAndWrite(%d): ranked peer with %d from %d to %d", r, peerRank, r1, r2) // Write to ledger, noting that ledger writes must be in order @@ -622,7 +622,7 @@ func (s *Service) fetchRound(cert agreement.Certificate, verifier *agreement.Asy default: } logging.Base().Warnf("fetchRound could not acquire block, fetcher errored out: %v", err) - peerSelector.RankPeer(psp, peerRankDownloadFailed) + peerSelector.rankPeer(psp, peerRankDownloadFailed) continue } @@ -632,7 +632,7 @@ func (s *Service) fetchRound(cert agreement.Certificate, verifier *agreement.Asy } // Otherwise, fetcher gave us the wrong block logging.Base().Warnf("fetcher gave us bad/wrong block (for round %d): fetched hash %v; want hash %v", cert.Round, block.Hash(), blockHash) - peerSelector.RankPeer(psp, peerRankInvalidDownload) + peerSelector.rankPeer(psp, peerRankInvalidDownload) // As a failsafe, if the cert we fetched is valid but for the wrong block, panic as loudly as possible if cert.Round == fetchedCert.Round && From 4aa0f489eb7aaf5cb36d102418c3e0e964997bfa Mon Sep 17 00:00:00 2001 From: algonautshant Date: Tue, 15 Jun 2021 02:08:47 -0400 Subject: [PATCH 3/4] test for error first --- catchup/peerSelector_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/catchup/peerSelector_test.go b/catchup/peerSelector_test.go index fa04ddb087..93e724459f 100644 --- a/catchup/peerSelector_test.go +++ b/catchup/peerSelector_test.go @@ -119,15 +119,15 @@ func TestPeerSelector(t *testing.T) { ) psp, err := peerSelector.getNextPeer() - peer := psp.Peer require.NoError(t, err) + peer := psp.Peer require.Equal(t, "12345", peerAddress(peer)) // replace peer. peers = []network.Peer{&mockHTTPPeer{address: "54321"}} psp, err = peerSelector.getNextPeer() - peer = psp.Peer require.NoError(t, err) + peer = psp.Peer require.Equal(t, "54321", peerAddress(peer)) // add another peer @@ -136,16 +136,16 @@ func TestPeerSelector(t *testing.T) { require.True(t, r1 != r2) psp, err = peerSelector.getNextPeer() - peer = psp.Peer require.NoError(t, err) + peer = psp.Peer require.Equal(t, "abcde", peerAddress(peer)) r1, r2 = peerSelector.rankPeer(psp, 200) require.True(t, r1 != r2) psp, err = peerSelector.getNextPeer() - peer = psp.Peer require.NoError(t, err) + peer = psp.Peer require.Equal(t, "54321", peerAddress(peer)) peers = []network.Peer{t} // include a non-peer object, to test the refreshAvailablePeers handling of empty addresses. From 9d8a46007f0c6eca74f05abe82485da332996bbf Mon Sep 17 00:00:00 2001 From: algonautshant Date: Tue, 15 Jun 2021 12:24:44 -0400 Subject: [PATCH 4/4] Fix the test name so it runs --- catchup/peerSelector_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/catchup/peerSelector_test.go b/catchup/peerSelector_test.go index 93e724459f..5a12b7d3e2 100644 --- a/catchup/peerSelector_test.go +++ b/catchup/peerSelector_test.go @@ -422,8 +422,8 @@ func TestPenalty(t *testing.T) { require.Equal(t, counters[4], 0) } -// TestpeerDownloadDurationToRank tests all the cases handled by peerDownloadDurationToRank -func TestpeerDownloadDurationToRank(t *testing.T) { +// TestPeerDownloadDurationToRank tests all the cases handled by peerDownloadDurationToRank +func TestPeerDownloadDurationToRank(t *testing.T) { peers1 := []network.Peer{&mockHTTPPeer{address: "a1"}, &mockHTTPPeer{address: "a2"}, &mockHTTPPeer{address: "a3"}} peers2 := []network.Peer{&mockHTTPPeer{address: "b1"}, &mockHTTPPeer{address: "b2"}}