diff --git a/fetch/fetch.go b/fetch/fetch.go index a6be410497..c5157308a7 100644 --- a/fetch/fetch.go +++ b/fetch/fetch.go @@ -84,6 +84,7 @@ type Config struct { BatchSize, QueueSize int RequestTimeout time.Duration // in seconds MaxRetriesForRequest int + PeersRateThreshold float64 `mapstructure:"peers-rate-threshold"` } // DefaultConfig is the default config for the fetch component. @@ -94,6 +95,7 @@ func DefaultConfig() Config { BatchSize: 20, RequestTimeout: time.Second * time.Duration(10), MaxRetriesForRequest: 100, + PeersRateThreshold: 0.02, } } @@ -174,12 +176,12 @@ func NewFetch( opts ...Option, ) *Fetch { bs := datastore.NewBlobStore(cdb.Database) + f := &Fetch{ cfg: DefaultConfig(), logger: log.NewNop(), bs: bs, host: host, - peers: peers.New(), servers: map[string]requester{}, unprocessed: make(map[types.Hash32]*request), ongoing: make(map[types.Hash32]*request), @@ -189,6 +191,11 @@ func NewFetch( for _, opt := range opts { opt(f) } + popts := []peers.Opt{} + if f.cfg.PeersRateThreshold != 0 { + popts = append(popts, peers.WithRateThreshold(f.cfg.PeersRateThreshold)) + } + f.peers = peers.New(popts...) // NOTE(dshulyak) this is to avoid tests refactoring. // there is one test that covers this part. if host != nil { diff --git a/fetch/peers/peers.go b/fetch/peers/peers.go index e57d8ac712..5a6c5f5b4d 100644 --- a/fetch/peers/peers.go +++ b/fetch/peers/peers.go @@ -21,11 +21,10 @@ func (p *data) successRate() float64 { return float64(p.success) / float64(p.success+p.failures) } -func (p *data) cmp(other *data) int { +func (p *data) cmp(other *data, rateThreshold float64) int { if p == nil && other != nil { return -1 } - const rateThreshold = 0.1 switch { case p.rate-other.rate > rateThreshold: return 1 @@ -41,13 +40,30 @@ func (p *data) cmp(other *data) int { return strings.Compare(string(p.id), string(other.id)) } -func New() *Peers { - return &Peers{peers: map[peer.ID]*data{}} +type Opt func(*Peers) + +func WithRateThreshold(rate float64) Opt { + return func(p *Peers) { + p.rateThreshold = rate + } +} + +func New(opts ...Opt) *Peers { + p := &Peers{ + peers: map[peer.ID]*data{}, + rateThreshold: 0.1, + } + for _, opt := range opts { + opt(p) + } + return p } type Peers struct { mu sync.Mutex peers map[peer.ID]*data + + rateThreshold float64 } func (p *Peers) Add(id peer.ID) { @@ -107,7 +123,7 @@ func (p *Peers) SelectBestFrom(peers []peer.ID) peer.ID { if !exist { continue } - if best.cmp(pdata) == -1 { + if best.cmp(pdata, p.rateThreshold) == -1 { best = pdata } } @@ -134,7 +150,7 @@ func (p *Peers) SelectBest(n int) []peer.ID { for _, peer := range p.peers { worst := peer for i := range cache { - if cache[i].cmp(worst) == -1 { + if cache[i].cmp(worst, p.rateThreshold) == -1 { cache[i], worst = worst, cache[i] } } diff --git a/fetch/peers/peers_test.go b/fetch/peers/peers_test.go index 2217e9e55d..356c68b532 100644 --- a/fetch/peers/peers_test.go +++ b/fetch/peers/peers_test.go @@ -19,7 +19,7 @@ type event struct { } func withEvents(events []event) *Peers { - tracker := New() + tracker := New(WithRateThreshold(0.1)) for _, ev := range events { if ev.delete { tracker.Delete(ev.id)