diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 1df5ab80a5cf..bbf5889d0e9f 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -1347,7 +1347,7 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) { defer tester.terminate() for i, tt := range tests { - // Register a new peer and ensure it's presence + // Register a new peer and ensure its presence id := fmt.Sprintf("test %d", i) if err := tester.newPeer(id, protocol, []common.Hash{tester.genesis.Hash()}, nil, nil, nil); err != nil { t.Fatalf("test %d: failed to register new peer: %v", i, err) diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index b9f16e673f94..7f55ce733574 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -235,8 +235,7 @@ func (q *queue) ShouldThrottleReceipts() bool { } // resultSlots calculates the number of results slots available for requests -// whilst adhering to both the item and the memory limit too of the results -// cache. +// whilst adhering to both the item and the memory limits of the result cache. func (q *queue) resultSlots(pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}) int { // Calculate the maximum length capped by the memory limit limit := len(q.resultCache) @@ -349,7 +348,7 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { } // Results retrieves and permanently removes a batch of fetch results from -// the cache. the result slice will be empty if the queue has been closed. +// the cache. The result slice will be empty if the queue has been closed. func (q *queue) Results(block bool) []*fetchResult { q.lock.Lock() defer q.lock.Unlock() @@ -511,7 +510,6 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common index := int(header.Number.Int64() - int64(q.resultOffset)) if index >= len(q.resultCache) || index < 0 { log.Error("index allocation went beyond available resultCache space", "index", index, "len.resultCache", len(q.resultCache), "blockNum", header.Number.Int64(), "resultOffset", q.resultOffset) - common.Report("index allocation went beyond available resultCache space") return nil, false, errInvalidChain } if q.resultCache[index] == nil { @@ -566,26 +564,29 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common // CancelHeaders aborts a fetch request, returning all pending skeleton indexes to the queue. func (q *queue) CancelHeaders(request *fetchRequest) { + q.lock.Lock() + defer q.lock.Unlock() q.cancel(request, q.headerTaskQueue, q.headerPendPool) } // CancelBodies aborts a body fetch request, returning all pending headers to the // task queue. func (q *queue) CancelBodies(request *fetchRequest) { + q.lock.Lock() + defer q.lock.Unlock() q.cancel(request, q.blockTaskQueue, q.blockPendPool) } // CancelReceipts aborts a body fetch request, returning all pending headers to // the task queue. func (q *queue) CancelReceipts(request *fetchRequest) { + q.lock.Lock() + defer q.lock.Unlock() q.cancel(request, q.receiptTaskQueue, q.receiptPendPool) } // Cancel aborts a fetch request, returning all pending hashes to the task queue. func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) { - q.lock.Lock() - defer q.lock.Unlock() - if request.From > 0 { taskQueue.Push(request.From, -int64(request.From)) }