generated from ipfs/ipfs-repository-template
-
Notifications
You must be signed in to change notification settings - Fork 110
/
Copy pathsessionwantsender.go
770 lines (674 loc) · 21.7 KB
/
sessionwantsender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
package session
import (
"context"
bsbpm "github.com/ipfs/boxo/bitswap/client/internal/blockpresencemanager"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p/core/peer"
)
const (
// Maximum number of changes to accept before blocking
changesBufferSize = 128
// If the session receives this many DONT_HAVEs in a row from a peer,
// it prunes the peer from the session
peerDontHaveLimit = 16
)
// BlockPresence indicates whether a peer has a block.
// Note that the order is important, we decide which peer to send a want to
// based on knowing whether peer has the block. eg we're more likely to send
// a want to a peer that has the block than a peer that doesnt have the block
// so BPHave > BPDontHave
type BlockPresence int
const (
BPDontHave BlockPresence = iota
BPUnknown
BPHave
)
// SessionWantsCanceller provides a method to cancel wants
type SessionWantsCanceller interface {
// Cancel wants for this session
CancelSessionWants(sid uint64, wants []cid.Cid)
}
// update encapsulates a message received by the session
type update struct {
// Which peer sent the update
from peer.ID
// cids of blocks received
ks []cid.Cid
// HAVE message
haves []cid.Cid
// DONT_HAVE message
dontHaves []cid.Cid
}
// peerAvailability indicates a peer's connection state
type peerAvailability struct {
target peer.ID
available bool
}
// change can be new wants, a new message received by the session,
// or a change in the connect status of a peer
type change struct {
// new wants requested
add []cid.Cid
// wants cancelled
cancel []cid.Cid
// new message received by session (blocks / HAVEs / DONT_HAVEs)
update update
// peer has connected / disconnected
availability peerAvailability
}
type (
onSendFn func(to peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid)
onPeersExhaustedFn func([]cid.Cid)
)
// sessionWantSender is responsible for sending want-have and want-block to
// peers. For each want, it sends a single optimistic want-block request to
// one peer and want-have requests to all other peers in the session.
// To choose the best peer for the optimistic want-block it maintains a list
// of how peers have responded to each want (HAVE / DONT_HAVE / Unknown) and
// consults the peer response tracker (records which peers sent us blocks).
type sessionWantSender struct {
// The context is used when sending wants
ctx context.Context
// Called to shutdown the sessionWantSender
shutdown func()
// The sessionWantSender uses the closed channel to signal when it's
// finished shutting down
closed chan struct{}
// The session ID
sessionID uint64
// A channel that collects incoming changes (events)
changes chan change
// Information about each want indexed by CID
wants map[cid.Cid]*wantInfo
// Keeps track of how many consecutive DONT_HAVEs a peer has sent
peerConsecutiveDontHaves map[peer.ID]int
// Tracks which peers we have send want-block to
swbt *sentWantBlocksTracker
// Tracks the number of blocks each peer sent us
peerRspTrkr *peerResponseTracker
// Sends wants to peers
pm PeerManager
// Keeps track of peers in the session
spm SessionPeerManager
// Cancels wants
canceller SessionWantsCanceller
// Keeps track of which peer has / doesn't have a block
bpm *bsbpm.BlockPresenceManager
// Called when wants are sent
onSend onSendFn
// Called when all peers explicitly don't have a block
onPeersExhausted onPeersExhaustedFn
}
func newSessionWantSender(sid uint64, pm PeerManager, spm SessionPeerManager, canceller SessionWantsCanceller,
bpm *bsbpm.BlockPresenceManager, onSend onSendFn, onPeersExhausted onPeersExhaustedFn,
) sessionWantSender {
ctx, cancel := context.WithCancel(context.Background())
sws := sessionWantSender{
ctx: ctx,
shutdown: cancel,
closed: make(chan struct{}),
sessionID: sid,
changes: make(chan change, changesBufferSize),
wants: make(map[cid.Cid]*wantInfo),
peerConsecutiveDontHaves: make(map[peer.ID]int),
swbt: newSentWantBlocksTracker(),
peerRspTrkr: newPeerResponseTracker(),
pm: pm,
spm: spm,
canceller: canceller,
bpm: bpm,
onSend: onSend,
onPeersExhausted: onPeersExhausted,
}
return sws
}
func (sws *sessionWantSender) ID() uint64 {
return sws.sessionID
}
// Add is called when new wants are added to the session
func (sws *sessionWantSender) Add(ks []cid.Cid) {
if len(ks) == 0 {
return
}
sws.addChange(change{add: ks})
}
// Cancel is called when a request is cancelled
func (sws *sessionWantSender) Cancel(ks []cid.Cid) {
if len(ks) == 0 {
return
}
sws.addChange(change{cancel: ks})
}
// Update is called when the session receives a message with incoming blocks
// or HAVE / DONT_HAVE
func (sws *sessionWantSender) Update(from peer.ID, ks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
hasUpdate := len(ks) > 0 || len(haves) > 0 || len(dontHaves) > 0
if !hasUpdate {
return
}
sws.addChange(change{
update: update{from, ks, haves, dontHaves},
})
}
// SignalAvailability is called by the PeerManager to signal that a peer has
// connected / disconnected
func (sws *sessionWantSender) SignalAvailability(p peer.ID, isAvailable bool) {
availability := peerAvailability{p, isAvailable}
// Add the change in a non-blocking manner to avoid the possibility of a
// deadlock
sws.addChangeNonBlocking(change{availability: availability})
}
// Run is the main loop for processing incoming changes
func (sws *sessionWantSender) Run() {
for {
select {
case ch := <-sws.changes:
sws.onChange([]change{ch})
case <-sws.ctx.Done():
// Unregister the session with the PeerManager
sws.pm.UnregisterSession(sws.sessionID)
// Close the 'closed' channel to signal to Shutdown() that the run
// loop has exited
close(sws.closed)
return
}
}
}
// Shutdown the sessionWantSender
func (sws *sessionWantSender) Shutdown() {
// Signal to the run loop to stop processing
sws.shutdown()
// Wait for run loop to complete
<-sws.closed
}
// addChange adds a new change to the queue
func (sws *sessionWantSender) addChange(c change) {
select {
case sws.changes <- c:
case <-sws.ctx.Done():
}
}
// addChangeNonBlocking adds a new change to the queue, using a go-routine
// if the change blocks, so as to avoid potential deadlocks
func (sws *sessionWantSender) addChangeNonBlocking(c change) {
select {
case sws.changes <- c:
default:
// changes channel is full, so add change in a go routine instead
go func() {
select {
case sws.changes <- c:
case <-sws.ctx.Done():
}
}()
}
}
// collectChanges collects all the changes that have occurred since the last
// invocation of onChange
func (sws *sessionWantSender) collectChanges(changes []change) []change {
for len(changes) < changesBufferSize {
select {
case next := <-sws.changes:
changes = append(changes, next)
default:
return changes
}
}
return changes
}
// onChange processes the next set of changes
func (sws *sessionWantSender) onChange(changes []change) {
// Several changes may have been recorded since the last time we checked,
// so pop all outstanding changes from the channel
changes = sws.collectChanges(changes)
// Apply each change
availability := make(map[peer.ID]bool, len(changes))
cancels := make([]cid.Cid, 0)
var updates []update
for _, chng := range changes {
// Initialize info for new wants
for _, c := range chng.add {
sws.trackWant(c)
}
// Remove cancelled wants
for _, c := range chng.cancel {
sws.untrackWant(c)
cancels = append(cancels, c)
}
// Consolidate updates and changes to availability
if chng.update.from != "" {
// If the update includes blocks or haves, treat it as signaling that
// the peer is available
if len(chng.update.ks) > 0 || len(chng.update.haves) > 0 {
p := chng.update.from
availability[p] = true
// Register with the PeerManager
sws.pm.RegisterSession(p, sws)
}
updates = append(updates, chng.update)
}
if chng.availability.target != "" {
availability[chng.availability.target] = chng.availability.available
}
}
// Update peer availability
newlyAvailable, newlyUnavailable := sws.processAvailability(availability)
// Update wants
dontHaves := sws.processUpdates(updates)
// Check if there are any wants for which all peers have indicated they
// don't have the want
sws.checkForExhaustedWants(dontHaves, newlyUnavailable)
// If there are any cancels, send them
if len(cancels) > 0 {
sws.canceller.CancelSessionWants(sws.sessionID, cancels)
}
// If there are some connected peers, send any pending wants
if sws.spm.HasPeers() {
sws.sendNextWants(newlyAvailable)
}
}
// processAvailability updates the want queue with any changes in
// peer availability
// It returns the peers that have become
// - newly available
// - newly unavailable
func (sws *sessionWantSender) processAvailability(availability map[peer.ID]bool) (avail []peer.ID, unavail []peer.ID) {
var newlyAvailable []peer.ID
var newlyUnavailable []peer.ID
for p, isNowAvailable := range availability {
stateChange := false
if isNowAvailable {
isNewPeer := sws.spm.AddPeer(p)
if isNewPeer {
stateChange = true
newlyAvailable = append(newlyAvailable, p)
}
} else {
wasAvailable := sws.spm.RemovePeer(p)
if wasAvailable {
stateChange = true
newlyUnavailable = append(newlyUnavailable, p)
}
}
// If the state has changed
if stateChange {
sws.updateWantsPeerAvailability(p, isNowAvailable)
// Reset the count of consecutive DONT_HAVEs received from the
// peer
delete(sws.peerConsecutiveDontHaves, p)
}
}
return newlyAvailable, newlyUnavailable
}
// trackWant creates a new entry in the map of CID -> want info
func (sws *sessionWantSender) trackWant(c cid.Cid) {
if _, ok := sws.wants[c]; ok {
return
}
// Create the want info
wi := newWantInfo(sws.peerRspTrkr)
sws.wants[c] = wi
// For each available peer, register any information we know about
// whether the peer has the block
for _, p := range sws.spm.Peers() {
sws.updateWantBlockPresence(c, p)
}
}
// untrackWant removes an entry from the map of CID -> want info
func (sws *sessionWantSender) untrackWant(c cid.Cid) {
delete(sws.wants, c)
}
// processUpdates processes incoming blocks and HAVE / DONT_HAVEs.
// It returns all DONT_HAVEs.
func (sws *sessionWantSender) processUpdates(updates []update) []cid.Cid {
// Process received blocks keys
blkCids := cid.NewSet()
for _, upd := range updates {
for _, c := range upd.ks {
blkCids.Add(c)
// Remove the want
removed := sws.removeWant(c)
if removed != nil {
// Inform the peer tracker that this peer was the first to send
// us the block
sws.peerRspTrkr.receivedBlockFrom(upd.from)
// Protect the connection to this peer so that we can ensure
// that the connection doesn't get pruned by the connection
// manager
sws.spm.ProtectConnection(upd.from)
}
delete(sws.peerConsecutiveDontHaves, upd.from)
}
}
// Process received DONT_HAVEs
dontHaves := cid.NewSet()
prunePeers := make(map[peer.ID]struct{})
for _, upd := range updates {
for _, c := range upd.dontHaves {
// Track the number of consecutive DONT_HAVEs each peer receives
if sws.peerConsecutiveDontHaves[upd.from] == peerDontHaveLimit {
prunePeers[upd.from] = struct{}{}
} else {
sws.peerConsecutiveDontHaves[upd.from]++
}
// If we already received a block for the want, there's no need to
// update block presence etc
if blkCids.Has(c) {
continue
}
dontHaves.Add(c)
// Update the block presence for the peer
sws.updateWantBlockPresence(c, upd.from)
// Check if the DONT_HAVE is in response to a want-block
// (could also be in response to want-have)
if sws.swbt.haveSentWantBlockTo(upd.from, c) {
// If we were waiting for a response from this peer, clear
// sentTo so that we can send the want to another peer
if sentTo, ok := sws.getWantSentTo(c); ok && sentTo == upd.from {
sws.setWantSentTo(c, "")
}
}
}
}
// Process received HAVEs
for _, upd := range updates {
for _, c := range upd.haves {
// If we haven't already received a block for the want
if !blkCids.Has(c) {
// Update the block presence for the peer
sws.updateWantBlockPresence(c, upd.from)
}
// Clear the consecutive DONT_HAVE count for the peer
delete(sws.peerConsecutiveDontHaves, upd.from)
delete(prunePeers, upd.from)
}
}
// If any peers have sent us too many consecutive DONT_HAVEs, remove them
// from the session
for p := range prunePeers {
// Before removing the peer from the session, check if the peer
// sent us a HAVE for a block that we want
for c := range sws.wants {
if sws.bpm.PeerHasBlock(p, c) {
delete(prunePeers, p)
break
}
}
}
if len(prunePeers) > 0 {
go func() {
for p := range prunePeers {
// Peer doesn't have anything we want, so remove it
sws.bpm.RemovePeer(p)
log.Infof("peer %s sent too many dont haves, removing from session %d", p, sws.ID())
sws.SignalAvailability(p, false)
}
}()
}
return dontHaves.Keys()
}
// checkForExhaustedWants checks if there are any wants for which all peers
// have sent a DONT_HAVE. We call these "exhausted" wants.
func (sws *sessionWantSender) checkForExhaustedWants(dontHaves []cid.Cid, newlyUnavailable []peer.ID) {
// If there are no new DONT_HAVEs, and no peers became unavailable, then
// we don't need to check for exhausted wants
if len(dontHaves) == 0 && len(newlyUnavailable) == 0 {
return
}
// We need to check each want for which we just received a DONT_HAVE
wants := dontHaves
// If a peer just became unavailable, then we need to check all wants
// (because it may be the last peer who hadn't sent a DONT_HAVE for a CID)
if len(newlyUnavailable) > 0 {
// Collect all pending wants
wants = make([]cid.Cid, len(sws.wants))
for c := range sws.wants {
wants = append(wants, c)
}
// If the last available peer in the session has become unavailable
// then we need to broadcast all pending wants
if !sws.spm.HasPeers() {
sws.processExhaustedWants(wants)
return
}
}
// If all available peers for a cid sent a DONT_HAVE, signal to the session
// that we've exhausted available peers
if len(wants) > 0 {
exhausted := sws.bpm.AllPeersDoNotHaveBlock(sws.spm.Peers(), wants)
sws.processExhaustedWants(exhausted)
}
}
// processExhaustedWants filters the list so that only those wants that haven't
// already been marked as exhausted are passed to onPeersExhausted()
func (sws *sessionWantSender) processExhaustedWants(exhausted []cid.Cid) {
newlyExhausted := sws.newlyExhausted(exhausted)
if len(newlyExhausted) > 0 {
sws.onPeersExhausted(newlyExhausted)
}
}
// convenience structs for passing around want-blocks and want-haves for a peer
type wantSets struct {
wantBlocks *cid.Set
wantHaves *cid.Set
}
type allWants map[peer.ID]*wantSets
func (aw allWants) forPeer(p peer.ID) *wantSets {
if _, ok := aw[p]; !ok {
aw[p] = &wantSets{
wantBlocks: cid.NewSet(),
wantHaves: cid.NewSet(),
}
}
return aw[p]
}
// sendNextWants sends wants to peers according to the latest information
// about which peers have / dont have blocks
func (sws *sessionWantSender) sendNextWants(newlyAvailable []peer.ID) {
toSend := make(allWants)
for c, wi := range sws.wants {
// Ensure we send want-haves to any newly available peers
for _, p := range newlyAvailable {
toSend.forPeer(p).wantHaves.Add(c)
}
// We already sent a want-block to a peer and haven't yet received a
// response yet
if wi.sentTo != "" {
continue
}
// All the peers have indicated that they don't have the block
// corresponding to this want, so we must wait to discover more peers
if wi.bestPeer == "" {
// TODO: work this out in real time instead of using bestP?
continue
}
// Record that we are sending a want-block for this want to the peer
sws.setWantSentTo(c, wi.bestPeer)
// Send a want-block to the chosen peer
toSend.forPeer(wi.bestPeer).wantBlocks.Add(c)
// Send a want-have to each other peer
for _, op := range sws.spm.Peers() {
if op != wi.bestPeer {
toSend.forPeer(op).wantHaves.Add(c)
}
}
}
// Send any wants we've collected
sws.sendWants(toSend)
}
// sendWants sends want-have and want-blocks to the appropriate peers
func (sws *sessionWantSender) sendWants(sends allWants) {
// For each peer we're sending a request to
for p, snd := range sends {
// Piggyback some other want-haves onto the request to the peer
for _, c := range sws.getPiggybackWantHaves(p, snd.wantBlocks) {
snd.wantHaves.Add(c)
}
// Send the wants to the peer.
// Note that the PeerManager ensures that we don't sent duplicate
// want-haves / want-blocks to a peer, and that want-blocks take
// precedence over want-haves.
wblks := snd.wantBlocks.Keys()
whaves := snd.wantHaves.Keys()
sws.pm.SendWants(sws.ctx, p, wblks, whaves)
// Inform the session that we've sent the wants
sws.onSend(p, wblks, whaves)
// Record which peers we send want-block to
sws.swbt.addSentWantBlocksTo(p, wblks)
}
}
// getPiggybackWantHaves gets the want-haves that should be piggybacked onto
// a request that we are making to send want-blocks to a peer
func (sws *sessionWantSender) getPiggybackWantHaves(p peer.ID, wantBlocks *cid.Set) []cid.Cid {
var whs []cid.Cid
for c := range sws.wants {
// Don't send want-have if we're already sending a want-block
// (or have previously)
if !wantBlocks.Has(c) && !sws.swbt.haveSentWantBlockTo(p, c) {
whs = append(whs, c)
}
}
return whs
}
// newlyExhausted filters the list of keys for wants that have not already
// been marked as exhausted (all peers indicated they don't have the block)
func (sws *sessionWantSender) newlyExhausted(ks []cid.Cid) []cid.Cid {
var res []cid.Cid
for _, c := range ks {
if wi, ok := sws.wants[c]; ok {
if !wi.exhausted {
res = append(res, c)
wi.exhausted = true
}
}
}
return res
}
// removeWant is called when the corresponding block is received
func (sws *sessionWantSender) removeWant(c cid.Cid) *wantInfo {
if wi, ok := sws.wants[c]; ok {
delete(sws.wants, c)
return wi
}
return nil
}
// updateWantsPeerAvailability is called when the availability changes for a
// peer. It updates all the wants accordingly.
func (sws *sessionWantSender) updateWantsPeerAvailability(p peer.ID, isNowAvailable bool) {
for c, wi := range sws.wants {
if isNowAvailable {
sws.updateWantBlockPresence(c, p)
} else {
wi.removePeer(p)
}
}
}
// updateWantBlockPresence is called when a HAVE / DONT_HAVE is received for the given
// want / peer
func (sws *sessionWantSender) updateWantBlockPresence(c cid.Cid, p peer.ID) {
wi, ok := sws.wants[c]
if !ok {
return
}
// If the peer sent us a HAVE or DONT_HAVE for the cid, adjust the
// block presence for the peer / cid combination
switch {
case sws.bpm.PeerHasBlock(p, c):
wi.setPeerBlockPresence(p, BPHave)
case sws.bpm.PeerDoesNotHaveBlock(p, c):
wi.setPeerBlockPresence(p, BPDontHave)
default:
wi.setPeerBlockPresence(p, BPUnknown)
}
}
// Which peer was the want sent to
func (sws *sessionWantSender) getWantSentTo(c cid.Cid) (peer.ID, bool) {
if wi, ok := sws.wants[c]; ok {
return wi.sentTo, true
}
return "", false
}
// Record which peer the want was sent to
func (sws *sessionWantSender) setWantSentTo(c cid.Cid, p peer.ID) {
if wi, ok := sws.wants[c]; ok {
wi.sentTo = p
}
}
// wantInfo keeps track of the information for a want
type wantInfo struct {
// Tracks HAVE / DONT_HAVE sent to us for the want by each peer
blockPresence map[peer.ID]BlockPresence
// The peer that we've sent a want-block to (cleared when we get a response)
sentTo peer.ID
// The "best" peer to send the want to next
bestPeer peer.ID
// Keeps track of how many hits / misses each peer has sent us for wants
// in the session
peerRspTrkr *peerResponseTracker
// true if all known peers have sent a DONT_HAVE for this want
exhausted bool
}
// func newWantInfo(prt *peerResponseTracker, c cid.Cid, startIndex int) *wantInfo {
func newWantInfo(prt *peerResponseTracker) *wantInfo {
return &wantInfo{
blockPresence: make(map[peer.ID]BlockPresence),
peerRspTrkr: prt,
exhausted: false,
}
}
// setPeerBlockPresence sets the block presence for the given peer
func (wi *wantInfo) setPeerBlockPresence(p peer.ID, bp BlockPresence) {
wi.blockPresence[p] = bp
wi.calculateBestPeer()
// If a peer informed us that it has a block then make sure the want is no
// longer flagged as exhausted (exhausted means no peers have the block)
if bp == BPHave {
wi.exhausted = false
}
}
// removePeer deletes the given peer from the want info
func (wi *wantInfo) removePeer(p peer.ID) {
// If we were waiting to hear back from the peer that is being removed,
// clear the sentTo field so we no longer wait
if p == wi.sentTo {
wi.sentTo = ""
}
delete(wi.blockPresence, p)
wi.calculateBestPeer()
}
// calculateBestPeer finds the best peer to send the want to next
func (wi *wantInfo) calculateBestPeer() {
// Recalculate the best peer
bestBP := BPDontHave
bestPeer := peer.ID("")
// Find the peer with the best block presence, recording how many peers
// share the block presence
countWithBest := 0
for p, bp := range wi.blockPresence {
if bp > bestBP {
bestBP = bp
bestPeer = p
countWithBest = 1
} else if bp == bestBP {
countWithBest++
}
}
wi.bestPeer = bestPeer
// If no peer has a block presence better than DONT_HAVE, bail out
if bestPeer == "" {
return
}
// If there was only one peer with the best block presence, we're done
if countWithBest <= 1 {
return
}
// There were multiple peers with the best block presence, so choose one of
// them to be the best
var peersWithBest []peer.ID
for p, bp := range wi.blockPresence {
if bp == bestBP {
peersWithBest = append(peersWithBest, p)
}
}
wi.bestPeer = wi.peerRspTrkr.choose(peersWithBest)
}