Skip to content

Commit

Permalink
catchup: fetchAndWrite/fetchRound quit early on errNoBlockForRound (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy authored Nov 8, 2023
1 parent 7ebb9f4 commit 03efd42
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 40 deletions.
40 changes: 25 additions & 15 deletions catchup/peerSelector.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ const (
peerRank4LowBlockTime = 801
peerRank4HighBlockTime = 999

// peerRankNoBlockForRound is used for responses failed because of no block for round
// This indicates a peer is either behind or a block has not happened yet, or does not have a block that is old enough.
peerRankNoBlockForRound = 2000

// peerRankDownloadFailed is used for responses which could be temporary, such as missing files, or such that we don't
// have clear resolution
peerRankDownloadFailed = 10000
Expand Down Expand Up @@ -143,7 +147,7 @@ func makeHistoricStatus(windowSize int, class peerClass) *historicStats {
// that will determine the rank of the peer.
hs := historicStats{
windowSize: windowSize,
rankSamples: make([]int, windowSize, windowSize),
rankSamples: make([]int, windowSize),
requestGaps: make([]uint64, 0, windowSize),
rankSum: uint64(class.initialRank) * uint64(windowSize),
gapSum: 0.0}
Expand Down Expand Up @@ -229,18 +233,24 @@ func (hs *historicStats) push(value int, counter uint64, class peerClass) (avera

// Download may fail for various reasons. Give it additional tries
// and see if it recovers/improves.
if value == peerRankDownloadFailed {
factor := float64(1.0)
switch value {
// - Set the rank to the class upper bound multiplied
// by the number of downloadFailures.
// - Each downloadFailure increments the counter, and
// each non-failure decrements it, until it gets to 0.
// - When the peer is consistently failing to
// download, the value added to rankSum will
// increase at an increasing rate to evict the peer
// from the class sooner.
case peerRankNoBlockForRound:
// for the no block errors apply very smooth rank increase
factor = 0.1
fallthrough
case peerRankDownloadFailed:
hs.downloadFailures++
// - Set the rank to the class upper bound multiplied
// by the number of downloadFailures.
// - Each downloadFailure increments the counter, and
// each non-failure decrements it, until it gets to 0.
// - When the peer is consistently failing to
// download, the value added to rankSum will
// increase at an increasing rate to evict the peer
// from the class sooner.
value = upperBound(class) * int(math.Exp2(float64(hs.downloadFailures)))
} else {
value = upperBound(class) * int(math.Exp2(float64(hs.downloadFailures)*factor))
default:
if hs.downloadFailures > 0 {
hs.downloadFailures--
}
Expand All @@ -252,12 +262,12 @@ func (hs *historicStats) push(value int, counter uint64, class peerClass) (avera
// The average performance of the peer
average := float64(hs.rankSum) / float64(len(hs.rankSamples))

if int(average) > upperBound(class) && initialRank == peerRankDownloadFailed {
if int(average) > upperBound(class) && (initialRank == peerRankDownloadFailed || initialRank == peerRankNoBlockForRound) {
// peerRankDownloadFailed will be delayed, to give the peer
// additional time to improve. If does not improve over time,
// the average will exceed the class limit. At this point,
// it will be pushed down to download failed class.
return peerRankDownloadFailed
return initialRank
}

// A penalty is added relative to how freequently the peer is used
Expand Down Expand Up @@ -470,7 +480,7 @@ func (ps *peerSelector) refreshAvailablePeers() {
for peerIdx := len(pool.peers) - 1; peerIdx >= 0; peerIdx-- {
peer := pool.peers[peerIdx].peer
if peerAddress := peerAddress(peer); peerAddress != "" {
if toRemove, _ := existingPeers[pool.peers[peerIdx].class.peerClass][peerAddress]; toRemove {
if toRemove := existingPeers[pool.peers[peerIdx].class.peerClass][peerAddress]; toRemove {
// need to be removed.
pool.peers = append(pool.peers[:peerIdx], pool.peers[peerIdx+1:]...)
}
Expand Down
46 changes: 44 additions & 2 deletions catchup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ func (s *Service) innerFetch(ctx context.Context, r basics.Round, peer network.P
return
}

const errNoBlockForRoundThreshold = 5

// fetchAndWrite fetches a block, checks the cert, and writes it to the ledger. Cert checking and ledger writing both wait for the ledger to advance if necessary.
// Returns false if we should stop trying to catch up. This may occur for several reasons:
// - If the context is canceled (e.g. if the node is shutting down)
Expand All @@ -254,6 +256,11 @@ func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCo
if dontSyncRound := s.GetDisableSyncRound(); dontSyncRound != 0 && r >= basics.Round(dontSyncRound) {
return false
}

// peerErrors tracks occurrences of errNoBlockForRound in order to quit earlier without making
// repeated requests for a block that most likely does not exist yet
peerErrors := map[network.Peer]int{}

i := 0
for {
i++
Expand Down Expand Up @@ -302,8 +309,19 @@ func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCo
s.log.Infof("fetchAndWrite(%d): the block is already in the ledger. The catchup is complete", r)
return false
}
failureRank := peerRankDownloadFailed
if err == errNoBlockForRound {
failureRank = peerRankNoBlockForRound
// remote peer doesn't have the block, try another peer
// quit if the the same peer peer encountered errNoBlockForRound more than errNoBlockForRoundThreshold times
if count := peerErrors[peer]; count > errNoBlockForRoundThreshold {
s.log.Infof("fetchAndWrite(%d): remote peers do not have the block. Quitting", r)
return false
}
peerErrors[peer]++
}
s.log.Debugf("fetchAndWrite(%v): Could not fetch: %v (attempt %d)", r, err, i)
peerSelector.rankPeer(psp, peerRankDownloadFailed)
peerSelector.rankPeer(psp, failureRank)

// we've just failed to retrieve a block; wait until the previous block is fetched before trying again
// to avoid the usecase where the first block doesn't exist, and we're making many requests down the chain
Expand Down Expand Up @@ -689,6 +707,8 @@ func (s *Service) fetchRound(cert agreement.Certificate, verifier *agreement.Asy
return
}

peerErrors := map[network.Peer]int{}

blockHash := bookkeeping.BlockHash(cert.Proposal.BlockDigest) // semantic digest (i.e., hash of the block header), not byte-for-byte digest
peerSelector := createPeerSelector(s.net, s.cfg, false)
for s.ledger.LastRound() < cert.Round {
Expand All @@ -710,8 +730,30 @@ func (s *Service) fetchRound(cert agreement.Certificate, verifier *agreement.Asy
return
default:
}
failureRank := peerRankDownloadFailed
if err == errNoBlockForRound {
failureRank = peerRankNoBlockForRound
// If a peer does not have the block after few attempts it probably has not persisted the block yet.
// Give it some time to persist the block and try again.
// None, there is no exit condition on too many retries as per the function contract.
if count, ok := peerErrors[peer]; ok {
if count > errNoBlockForRoundThreshold {
time.Sleep(50 * time.Millisecond)
}
if count > errNoBlockForRoundThreshold*10 {
// for the low number of connected peers (like 2) the following scenatio is possible:
// - both peers do not have the block
// - peer selector punishes one of the peers more than the other
// - the punoshed peer gets the block, and the less punished peer stucks.
// It this case reset the peer selector to let it re-learn priorities.
peerSelector = createPeerSelector(s.net, s.cfg, false)
}
}
peerErrors[peer]++
}
// remote peer doesn't have the block, try another peer
logging.Base().Warnf("fetchRound could not acquire block, fetcher errored out: %v", err)
peerSelector.rankPeer(psp, peerRankDownloadFailed)
peerSelector.rankPeer(psp, failureRank)
continue
}

Expand Down
81 changes: 76 additions & 5 deletions catchup/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"context"
"errors"
"math/rand"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -180,6 +182,27 @@ func (cl *periodicSyncLogger) Warnf(s string, args ...interface{}) {
cl.Logger.Warnf(s, args...)
}

type periodicSyncDebugLogger struct {
periodicSyncLogger
debugMsgFilter []string
debugMsgs atomic.Uint32
}

func (cl *periodicSyncDebugLogger) Debugf(s string, args ...interface{}) {
// save debug messages for later inspection.
if len(cl.debugMsgFilter) > 0 {
for _, filter := range cl.debugMsgFilter {
if strings.Contains(s, filter) {
cl.debugMsgs.Add(1)
break
}
}
} else {
cl.debugMsgs.Add(1)
}
cl.Logger.Debugf(s, args...)
}

func TestSyncRound(t *testing.T) {
partitiontest.PartitionTest(t)

Expand Down Expand Up @@ -208,7 +231,7 @@ func TestSyncRound(t *testing.T) {

auth := &mockedAuthenticator{fail: true}
initialLocalRound := local.LastRound()
require.True(t, 0 == initialLocalRound)
require.Zero(t, initialLocalRound)

// Make Service
localCfg := config.GetDefaultLocal()
Expand Down Expand Up @@ -253,7 +276,7 @@ func TestSyncRound(t *testing.T) {
s.UnsetDisableSyncRound()
// wait until the catchup is done
waitStart = time.Now()
for time.Now().Sub(waitStart) < 8*s.deadlineTimeout {
for time.Since(waitStart) < 8*s.deadlineTimeout {
if remote.LastRound() == local.LastRound() {
break
}
Expand Down Expand Up @@ -298,7 +321,7 @@ func TestPeriodicSync(t *testing.T) {

auth := &mockedAuthenticator{fail: true}
initialLocalRound := local.LastRound()
require.True(t, 0 == initialLocalRound)
require.Zero(t, initialLocalRound)

// Make Service
s := MakeService(logging.Base(), defaultConfig, net, local, auth, nil, nil)
Expand All @@ -315,7 +338,7 @@ func TestPeriodicSync(t *testing.T) {
// wait until the catchup is done. Since we've might have missed the sleep window, we need to wait
// until the synchronization is complete.
waitStart := time.Now()
for time.Now().Sub(waitStart) < 10*s.deadlineTimeout {
for time.Since(waitStart) < 10*s.deadlineTimeout {
if remote.LastRound() == local.LastRound() {
break
}
Expand Down Expand Up @@ -506,7 +529,6 @@ func TestServiceFetchBlocksMultiBlocks(t *testing.T) {
localBlock, err := local.Block(i)
require.NoError(t, err)
require.Equal(t, *blk, localBlock)
return
}
}

Expand Down Expand Up @@ -1184,3 +1206,52 @@ func TestServiceLedgerUnavailable(t *testing.T) {
require.Greater(t, local.LastRound(), basics.Round(0))
require.Less(t, local.LastRound(), remote.LastRound())
}

// TestServiceNoBlockForRound checks if fetchAndWrite does not repeats 500 times if a block not avaialble
func TestServiceNoBlockForRound(t *testing.T) {
partitiontest.PartitionTest(t)

// Make Ledger
local := new(mockedLedger)
local.blocks = append(local.blocks, bookkeeping.Block{})

remote, _, blk, err := buildTestLedger(t, bookkeeping.Block{})
if err != nil {
t.Fatal(err)
return
}
numBlocks := 10
addBlocks(t, remote, blk, numBlocks)

// Create a network and block service
blockServiceConfig := config.GetDefaultLocal()
net := &httpTestPeerSource{}
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
net.addPeer(rootURL)

require.Equal(t, basics.Round(0), local.LastRound())
require.Equal(t, basics.Round(numBlocks+1), remote.LastRound())

// Make Service
auth := &mockedAuthenticator{fail: false}
cfg := config.GetDefaultLocal()
cfg.CatchupParallelBlocks = 8
s := MakeService(logging.Base(), cfg, net, local, auth, nil, nil)
pl := &periodicSyncDebugLogger{periodicSyncLogger: periodicSyncLogger{Logger: logging.Base()}}
s.log = pl
s.deadlineTimeout = 1 * time.Second

s.testStart()
defer s.Stop()
s.sync()

// without the fix there are about 2k messages (4x catchupRetryLimit)
// with the fix expect less than catchupRetryLimit
require.Less(t, int(pl.debugMsgs.Load()), catchupRetryLimit)
}
18 changes: 0 additions & 18 deletions test/e2e-go/features/catchup/basicCatchup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,24 +73,6 @@ func TestBasicCatchup(t *testing.T) {
// Now, catch up
err = fixture.LibGoalFixture.ClientWaitForRoundWithTimeout(cloneClient, waitForRound)
a.NoError(err)

cloneNC := fixture.GetNodeControllerForDataDir(cloneDataDir)
cloneRestClient := fixture.GetAlgodClientForController(cloneNC)

// an immediate call for ready will error, for sync time != 0
a.Error(cloneRestClient.ReadyCheck())

for {
status, err := cloneRestClient.Status()
a.NoError(err)

if status.LastRound < 10 {
time.Sleep(250 * time.Millisecond)
continue
}
a.NoError(cloneRestClient.ReadyCheck())
break
}
}

// TestCatchupOverGossip tests catchup across network versions
Expand Down

0 comments on commit 03efd42

Please sign in to comment.