Skip to content

Commit

Permalink
Merge pull request algorand#2285 from algonautshant/shant/fixTestBasi…
Browse files Browse the repository at this point in the history
…cCatchpointCatchup

<!--
Thanks for submitting a pull request! We appreciate the time and effort you spent to get this far.

If you haven't already, please make sure that you've reviewed the CONTRIBUTING guide:
https://github.com/algorand/go-algorand/blob/master/CONTRIBUTING.md#code-guidelines

In particular ensure that you've run the following:
* make generate
* make sanity (which runs make fmt, make lint, make fix and make vet)

It is also a good idea to run tests:
* make test
* make integration
-->

## Summary
peerSelector.go: various bug fixes
- introduce peerSelectorPeer to wrap the network.Peer and add peerClass information, to be able to distinguish between peers of the same address but different classes.
- keep track of download failures to be able to exponentially increase the cost of each failure when failing more than succeeding. This is to evict the peer faster when constantly failing to download.
- initialize rankSum and rankSamples to initialRank of the class. Otherwise, the peer rank will have a very long warmup time before relfecting the correct rank.
- let resetRequestPenalty bound the rank within the class bounds. Otherwise, the penalty calculation pushes the rank out of the class bounds (bug).
- getNextPeer, rankPeer, and peerDownloadDurationToRank are local to the package, since they are using non-exported peerSelectorPeer
- getNextPeer, PeerDownloadDurationToRank and RankPeer use peerSelectorPeer instead of network.Peer
- refreshAvailablePeers distinguishes between peers with the same address but of different peer class
- findPeer returns the peer given the address and the peer class (instead of just the address)

catchpointCatchup_test.go:
- Remove comment about giving the second node all the stake, since it is not the case here.
- Use the round from the catchpoint instead of guessing the round as 36. In case the following catchpoint was obtained due to race conditions, checking for round 37 will be trivial, since it will also be obtained from the catchpoint.

catchpointService.go and service.go:
- Update the code to use peerSelectorPeer instead of network.Peer with peerSelector

peerSelector_test.go:
- Add new tests to check the peerSelector fixes in this PR
- Update the tests to use peerSelectorPeer instead of network.Peer with peerSelector
- Cleanup debugging printouts.

<!-- Explain the goal of this change and what problem it is solving. Format this cleanly so that it may be used for a commit message, as your changes will be squash-merged. -->

## Test Plan
Added tests to confirm the fixes. 
- TestClassUpperBound
- TestClassLowerBound
- TestEvictionAndUpgrade

<!-- How did you test these changes? Please provide the exact scenarios you tested in as much detail as possible including commands, output and rationale. -->
  • Loading branch information
tsachiherman authored Jun 21, 2021
2 parents 0cb300d + 9d8a460 commit 2824120
Show file tree
Hide file tree
Showing 5 changed files with 267 additions and 153 deletions.
60 changes: 31 additions & 29 deletions catchup/catchpointService.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ func (cs *CatchpointCatchupService) processStageLedgerDownload() (err error) {
}
return cs.abort(fmt.Errorf("processStageLedgerDownload failed to reset staging balances : %v", err))
}
peer, err := peerSelector.GetNextPeer()
psp, err := peerSelector.getNextPeer()
peer := psp.Peer
if err != nil {
err = fmt.Errorf("processStageLedgerDownload: catchpoint catchup was unable to obtain a list of peers to retrieve the catchpoint file from")
return cs.abort(err)
Expand All @@ -293,9 +294,9 @@ func (cs *CatchpointCatchupService) processStageLedgerDownload() (err error) {
break
}
// failed to build the merkle trie for the above catchpoint file.
peerSelector.RankPeer(peer, peerRankInvalidDownload)
peerSelector.rankPeer(psp, peerRankInvalidDownload)
} else {
peerSelector.RankPeer(peer, peerRankDownloadFailed)
peerSelector.rankPeer(psp, peerRankDownloadFailed)
}

// instead of testing for err == cs.ctx.Err() , we'll check on the context itself.
Expand Down Expand Up @@ -345,11 +346,11 @@ func (cs *CatchpointCatchupService) processStageLastestBlockDownload() (err erro
for {
attemptsCount++

peer := network.Peer(0)
var psp *peerSelectorPeer
blockDownloadDuration := time.Duration(0)
if blk == nil {
var stop bool
blk, blockDownloadDuration, peer, stop, err = cs.fetchBlock(blockRound, uint64(attemptsCount))
blk, blockDownloadDuration, psp, stop, err = cs.fetchBlock(blockRound, uint64(attemptsCount))
if stop {
return err
} else if blk == nil {
Expand All @@ -364,7 +365,7 @@ func (cs *CatchpointCatchupService) processStageLastestBlockDownload() (err erro
if attemptsCount <= cs.config.CatchupBlockDownloadRetryAttempts {
// try again.
blk = nil
cs.blocksDownloadPeerSelector.RankPeer(peer, peerRankInvalidDownload)
cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankInvalidDownload)
continue
}
return cs.abort(fmt.Errorf("processStageLastestBlockDownload: unsupported protocol version detected: '%v'", blk.BlockHeader.CurrentProtocol))
Expand All @@ -376,7 +377,7 @@ func (cs *CatchpointCatchupService) processStageLastestBlockDownload() (err erro
if attemptsCount <= cs.config.CatchupBlockDownloadRetryAttempts {
// try again.
blk = nil
cs.blocksDownloadPeerSelector.RankPeer(peer, peerRankInvalidDownload)
cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankInvalidDownload)
continue
}
return cs.abort(fmt.Errorf("processStageLastestBlockDownload: genesis hash mismatches : genesis hash on genesis.json file is %v while genesis hash of downloaded block is %v", cs.ledger.GenesisHash(), blk.GenesisHash()))
Expand All @@ -389,7 +390,7 @@ func (cs *CatchpointCatchupService) processStageLastestBlockDownload() (err erro
if attemptsCount <= cs.config.CatchupBlockDownloadRetryAttempts {
// try again.
blk = nil
cs.blocksDownloadPeerSelector.RankPeer(peer, peerRankInvalidDownload)
cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankInvalidDownload)
continue
}
return cs.abort(fmt.Errorf("processStageLastestBlockDownload: downloaded block content does not match downloaded block header"))
Expand All @@ -405,14 +406,14 @@ func (cs *CatchpointCatchupService) processStageLastestBlockDownload() (err erro
// try again.
blk = nil
cs.log.Infof("processStageLastestBlockDownload: block %d verification against catchpoint failed, another attempt will be made; err = %v", blockRound, err)
cs.blocksDownloadPeerSelector.RankPeer(peer, peerRankInvalidDownload)
cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankInvalidDownload)
continue
}
return cs.abort(fmt.Errorf("processStageLastestBlockDownload failed when calling VerifyCatchpoint : %v", err))
}
// give a rank to the download, as the download was successful.
peerRank := cs.blocksDownloadPeerSelector.PeerDownloadDurationToRank(peer, blockDownloadDuration)
cs.blocksDownloadPeerSelector.RankPeer(peer, peerRank)
peerRank := cs.blocksDownloadPeerSelector.peerDownloadDurationToRank(psp, blockDownloadDuration)
cs.blocksDownloadPeerSelector.rankPeer(psp, peerRank)

err = cs.ledgerAccessor.StoreBalancesRound(cs.ctx, blk)
if err != nil {
Expand Down Expand Up @@ -495,11 +496,11 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
}
}

peer := network.Peer(0)
var psp *peerSelectorPeer
blockDownloadDuration := time.Duration(0)
if blk == nil {
var stop bool
blk, blockDownloadDuration, peer, stop, err = cs.fetchBlock(topBlock.Round()-basics.Round(blocksFetched), retryCount)
blk, blockDownloadDuration, psp, stop, err = cs.fetchBlock(topBlock.Round()-basics.Round(blocksFetched), retryCount)
if stop {
return err
} else if blk == nil {
Expand All @@ -515,7 +516,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
// not identical, retry download.
cs.log.Warnf("processStageBlocksDownload downloaded block(%d) did not match it's successor(%d) block hash %v != %v", blk.Round(), prevBlock.Round(), blk.Hash(), prevBlock.BlockHeader.Branch)
cs.updateBlockRetrievalStatistics(-1, 0)
cs.blocksDownloadPeerSelector.RankPeer(peer, peerRankInvalidDownload)
cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankInvalidDownload)
if retryCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) {
// try again.
retryCount++
Expand All @@ -528,7 +529,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
if _, ok := config.Consensus[blk.BlockHeader.CurrentProtocol]; !ok {
cs.log.Warnf("processStageBlocksDownload: unsupported protocol version detected: '%v'", blk.BlockHeader.CurrentProtocol)
cs.updateBlockRetrievalStatistics(-1, 0)
cs.blocksDownloadPeerSelector.RankPeer(peer, peerRankInvalidDownload)
cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankInvalidDownload)
if retryCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) {
// try again.
retryCount++
Expand All @@ -541,7 +542,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
if !blk.ContentsMatchHeader() {
cs.log.Warnf("processStageBlocksDownload: downloaded block content does not match downloaded block header")
// try again.
cs.blocksDownloadPeerSelector.RankPeer(peer, peerRankInvalidDownload)
cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankInvalidDownload)
cs.updateBlockRetrievalStatistics(-1, 0)
if retryCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) {
// try again.
Expand All @@ -552,8 +553,8 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
}

cs.updateBlockRetrievalStatistics(0, 1)
peerRank := cs.blocksDownloadPeerSelector.PeerDownloadDurationToRank(peer, blockDownloadDuration)
cs.blocksDownloadPeerSelector.RankPeer(peer, peerRank)
peerRank := cs.blocksDownloadPeerSelector.peerDownloadDurationToRank(psp, blockDownloadDuration)
cs.blocksDownloadPeerSelector.rankPeer(psp, peerRank)

// all good, persist and move on.
err = cs.ledgerAccessor.StoreBlock(cs.ctx, blk)
Expand Down Expand Up @@ -581,39 +582,40 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
// fetchBlock uses the internal peer selector blocksDownloadPeerSelector to pick a peer and then attempt to fetch the block requested from that peer.
// The method return stop=true if the caller should exit the current operation
// If the method return a nil block, the caller is expected to retry the operation, increasing the retry counter as needed.
func (cs *CatchpointCatchupService) fetchBlock(round basics.Round, retryCount uint64) (blk *bookkeeping.Block, downloadDuration time.Duration, peer network.Peer, stop bool, err error) {
peer, err = cs.blocksDownloadPeerSelector.GetNextPeer()
func (cs *CatchpointCatchupService) fetchBlock(round basics.Round, retryCount uint64) (blk *bookkeeping.Block, downloadDuration time.Duration, psp *peerSelectorPeer, stop bool, err error) {
psp, err = cs.blocksDownloadPeerSelector.getNextPeer()
peer := psp.Peer
if err != nil {
err = fmt.Errorf("fetchBlock: unable to obtain a list of peers to retrieve the latest block from")
return nil, time.Duration(0), peer, true, cs.abort(err)
return nil, time.Duration(0), psp, true, cs.abort(err)
}

httpPeer, validPeer := peer.(network.HTTPPeer)
if !validPeer {
cs.log.Warnf("fetchBlock: non-HTTP peer was provided by the peer selector")
cs.blocksDownloadPeerSelector.RankPeer(peer, peerRankInvalidDownload)
cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankInvalidDownload)
if retryCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) {
// try again.
return nil, time.Duration(0), peer, false, nil
return nil, time.Duration(0), psp, false, nil
}
return nil, time.Duration(0), peer, true, cs.abort(fmt.Errorf("fetchBlock: recurring non-HTTP peer was provided by the peer selector"))
return nil, time.Duration(0), psp, true, cs.abort(fmt.Errorf("fetchBlock: recurring non-HTTP peer was provided by the peer selector"))
}
fetcher := makeUniversalBlockFetcher(cs.log, cs.net, cs.config)
blk, _, downloadDuration, err = fetcher.fetchBlock(cs.ctx, round, httpPeer)
if err != nil {
if cs.ctx.Err() != nil {
return nil, time.Duration(0), peer, true, cs.stopOrAbort()
return nil, time.Duration(0), psp, true, cs.stopOrAbort()
}
if retryCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) {
// try again.
cs.log.Infof("Failed to download block %d on attempt %d out of %d. %v", round, retryCount, cs.config.CatchupBlockDownloadRetryAttempts, err)
cs.blocksDownloadPeerSelector.RankPeer(peer, peerRankDownloadFailed)
return nil, time.Duration(0), peer, false, nil
cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankDownloadFailed)
return nil, time.Duration(0), psp, false, nil
}
return nil, time.Duration(0), peer, true, cs.abort(fmt.Errorf("fetchBlock failed after multiple blocks download attempts"))
return nil, time.Duration(0), psp, true, cs.abort(fmt.Errorf("fetchBlock failed after multiple blocks download attempts"))
}
// success
return blk, downloadDuration, peer, false, nil
return blk, downloadDuration, psp, false, nil
}

// processStageLedgerDownload is the fifth catchpoint catchup stage. It completes the catchup process, swap the new tables and restart the node functionality.
Expand Down
Loading

0 comments on commit 2824120

Please sign in to comment.