Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
Merge pull request #106 from ipfs/feat/rebroadcast-wantlist
Browse files Browse the repository at this point in the history
feat(messagequeue): rebroadcast wantlist
  • Loading branch information
Stebalien authored Apr 10, 2019
2 parents 85e3f43 + 13e0a4d commit 2c47a55
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 25 deletions.
85 changes: 60 additions & 25 deletions messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import (

var log = logging.Logger("bitswap")

const maxRetries = 10
const (
defaultRebroadcastInterval = 30 * time.Second
maxRetries = 10
)

// MessageNetwork is any network that can connect peers and generate a message
// sender.
Expand All @@ -33,21 +36,25 @@ type MessageQueue struct {
done chan struct{}

// do not touch out of run loop
wl *wantlist.SessionTrackedWantlist
nextMessage bsmsg.BitSwapMessage
nextMessageLk sync.RWMutex
sender bsnet.MessageSender
wl *wantlist.SessionTrackedWantlist
nextMessage bsmsg.BitSwapMessage
nextMessageLk sync.RWMutex
sender bsnet.MessageSender
rebroadcastIntervalLk sync.RWMutex
rebroadcastInterval time.Duration
rebroadcastTimer *time.Timer
}

// New creats a new MessageQueue.
func New(ctx context.Context, p peer.ID, network MessageNetwork) *MessageQueue {
return &MessageQueue{
ctx: ctx,
wl: wantlist.NewSessionTrackedWantlist(),
network: network,
p: p,
outgoingWork: make(chan struct{}, 1),
done: make(chan struct{}),
ctx: ctx,
wl: wantlist.NewSessionTrackedWantlist(),
network: network,
p: p,
outgoingWork: make(chan struct{}, 1),
done: make(chan struct{}),
rebroadcastInterval: defaultRebroadcastInterval,
}
}

Expand All @@ -64,27 +71,26 @@ func (mq *MessageQueue) AddMessage(entries []bsmsg.Entry, ses uint64) {

// AddWantlist adds a complete session tracked want list to a message queue
func (mq *MessageQueue) AddWantlist(initialWants *wantlist.SessionTrackedWantlist) {
mq.nextMessageLk.Lock()
defer mq.nextMessageLk.Unlock()

initialWants.CopyWants(mq.wl)
if initialWants.Len() > 0 {
if mq.nextMessage == nil {
mq.nextMessage = bsmsg.New(false)
}
for _, e := range initialWants.Entries() {
mq.nextMessage.AddEntry(e.Cid, e.Priority)
}
select {
case mq.outgoingWork <- struct{}{}:
default:
}
mq.addWantlist()
}

// SetRebroadcastInterval sets a new interval on which to rebroadcast the full wantlist
func (mq *MessageQueue) SetRebroadcastInterval(delay time.Duration) {
mq.rebroadcastIntervalLk.Lock()
mq.rebroadcastInterval = delay
if mq.rebroadcastTimer != nil {
mq.rebroadcastTimer.Reset(delay)
}
mq.rebroadcastIntervalLk.Unlock()
}

// Startup starts the processing of messages, and creates an initial message
// based on the given initial wantlist.
func (mq *MessageQueue) Startup() {
mq.rebroadcastIntervalLk.RLock()
mq.rebroadcastTimer = time.NewTimer(mq.rebroadcastInterval)
mq.rebroadcastIntervalLk.RUnlock()
go mq.runQueue()
}

Expand All @@ -96,6 +102,8 @@ func (mq *MessageQueue) Shutdown() {
func (mq *MessageQueue) runQueue() {
for {
select {
case <-mq.rebroadcastTimer.C:
mq.rebroadcastWantlist()
case <-mq.outgoingWork:
mq.sendMessage()
case <-mq.done:
Expand All @@ -112,6 +120,33 @@ func (mq *MessageQueue) runQueue() {
}
}

func (mq *MessageQueue) addWantlist() {

mq.nextMessageLk.Lock()
defer mq.nextMessageLk.Unlock()

if mq.wl.Len() > 0 {
if mq.nextMessage == nil {
mq.nextMessage = bsmsg.New(false)
}
for _, e := range mq.wl.Entries() {
mq.nextMessage.AddEntry(e.Cid, e.Priority)
}
select {
case mq.outgoingWork <- struct{}{}:
default:
}
}
}

func (mq *MessageQueue) rebroadcastWantlist() {
mq.rebroadcastIntervalLk.RLock()
mq.rebroadcastTimer.Reset(mq.rebroadcastInterval)
mq.rebroadcastIntervalLk.RUnlock()

mq.addWantlist()
}

func (mq *MessageQueue) addEntries(entries []bsmsg.Entry, ses uint64) bool {
var work bool
mq.nextMessageLk.Lock()
Expand Down
37 changes: 37 additions & 0 deletions messagequeue/messagequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,40 @@ func TestSendingMessagesPartialDupe(t *testing.T) {
}

}

func TestWantlistRebroadcast(t *testing.T) {

ctx := context.Background()
messagesSent := make(chan bsmsg.BitSwapMessage)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent}
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := New(ctx, peerID, fakenet)
ses := testutil.GenerateSessionID()
wl := testutil.GenerateWantlist(10, ses)

messageQueue.Startup()
messageQueue.AddWantlist(wl)
messages := collectMessages(ctx, t, messagesSent, 10*time.Millisecond)
if len(messages) != 1 {
t.Fatal("wrong number of messages were sent for initial wants")
}

messageQueue.SetRebroadcastInterval(5 * time.Millisecond)
messages = collectMessages(ctx, t, messagesSent, 8*time.Millisecond)
if len(messages) != 1 {
t.Fatal("wrong number of messages were rebroadcast")
}

firstMessage := messages[0]
if len(firstMessage.Wantlist()) != wl.Len() {
t.Fatal("did not add all wants to want list")
}
for _, entry := range firstMessage.Wantlist() {
if entry.Cancel {
t.Fatal("initial add sent cancel entry when it should not have")
}
}
}

0 comments on commit 2c47a55

Please sign in to comment.