diff --git a/eth/downloader/downloader_qng.go b/eth/downloader/downloader_qng.go index f51603ba3607..e5b51e2146c2 100644 --- a/eth/downloader/downloader_qng.go +++ b/eth/downloader/downloader_qng.go @@ -3,8 +3,12 @@ package downloader import ( + "fmt" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/log" + "time" ) func (d *Downloader) SyncQng(peerid string, mode SyncMode, hash common.Hash) error { @@ -21,7 +25,7 @@ func (d *Downloader) SyncQng(peerid string, mode SyncMode, hash common.Hash) err return errBadPeer } log.Info("Attempting to retrieve sync target", "peer", peer.id, "head", hash.String()) - headers, metas, err := d.fetchHeadersByHash(peer, hash, 1, 0, false) + headers, metas, err := d.fetchQngHeadersByHash(peer, hash, 1, 0, false) if err != nil || len(headers) != 1 { log.Warn("Failed to fetch sync target", "headers", len(headers), "err", err) return err @@ -33,3 +37,45 @@ func (d *Downloader) SyncQng(peerid string, mode SyncMode, hash common.Hash) err } return d.BeaconSync(mode, headers[0], headers[0]) } + +func (d *Downloader) fetchQngHeadersByHash(p *peerConnection, hash common.Hash, amount int, skip int, reverse bool) ([]*types.Header, []common.Hash, error) { + // Create the response sink and send the network request + start := time.Now() + resCh := make(chan *eth.Response) + + req, err := p.peer.RequestHeadersByHash(hash, amount, skip, reverse, resCh) + if err != nil { + return nil, nil, err + } + defer func() { + fmt.Println(time.Now().String(), "测试3") + req.Close() + }() + + // Wait until the response arrives, the request is cancelled or times out + ttl := d.peers.rates.TargetTimeout() + + timeoutTimer := time.NewTimer(ttl) + defer timeoutTimer.Stop() + + select { + case <-timeoutTimer.C: + // Header retrieval timed out, update the metrics + p.log.Debug("Header request timed out", "elapsed", ttl) + headerTimeoutMeter.Mark(1) + + return nil, nil, errTimeout + + case res := <-resCh: + // Headers successfully retrieved, update the metrics + headerReqTimer.Update(time.Since(start)) + headerInMeter.Mark(int64(len(*res.Res.(*eth.BlockHeadersRequest)))) + + // Don't reject the packet even if it turns out to be bad, downloader will + // disconnect the peer on its own terms. Simply delivery the headers to + // be processed by the caller + res.Done <- nil + + return *res.Res.(*eth.BlockHeadersRequest), res.Meta.([]common.Hash), nil + } +}