Skip to content

Commit

Permalink
fix: fix task stuck and not reassign bug in concurrent-fetch logic (b…
Browse files Browse the repository at this point in the history
  • Loading branch information
krish-nr authored Feb 1, 2024
1 parent 5d346de commit 8377b0e
Showing 1 changed file with 23 additions and 0 deletions.
23 changes: 23 additions & 0 deletions eth/downloader/fetchers_concurrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ import (
// to each request. Failing to do so is considered a protocol violation.
var timeoutGracePeriod = 2 * time.Minute

// peersRetryInterval is the retry interval when all peers cannot get the request data.
var peersRetryInterval = 100 * time.Millisecond

// maxRetries is the max retry time for unreserved download task
var maxRetries = 5

// typedQueue is an interface defining the adaptor needed to translate the type
// specific downloader/queue schedulers into the type-agnostic general concurrent
// fetcher algorithm calls.
Expand Down Expand Up @@ -125,6 +131,8 @@ func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error {

// Prepare the queue and fetch block parts until the block header fetcher's done
finished := false

requestRetried := 0
for {
// Short circuit if we lost all our peers
if d.peers.Len() == 0 && !beaconMode {
Expand Down Expand Up @@ -195,6 +203,10 @@ func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error {
// to the queue, that is async, and we can do better here by
// immediately pushing the unfulfilled requests.
queue.unreserve(peer.id) // TODO(karalabe): This needs a non-expiration method
//reset progressed
if len(pending) == 0 {
progressed = false
}
continue
}
pending[peer.id] = req
Expand All @@ -212,6 +224,17 @@ func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error {
if !progressed && !throttled && len(pending) == 0 && len(idles) == d.peers.Len() && queued > 0 && !beaconMode {
return errPeersUnavailable
}
// Retry the unreserved task in next loop
if beaconMode && len(pending) == 0 && queued > 0 && !progressed && !throttled && len(idles) == d.peers.Len() {
log.Warn("All idle peers are not valid for current task, will retry ...")
requestRetried++
if requestRetried > maxRetries {
log.Info("max retry exceeded, cancel request")
return errCanceled
}
time.Sleep(peersRetryInterval)
continue
}
}
// Wait for something to happen
select {
Expand Down

0 comments on commit 8377b0e

Please sign in to comment.