Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

feat(messagequeue): rebroadcast wantlist #106

Merged
merged 3 commits into from
Apr 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 60 additions & 25 deletions messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import (

var log = logging.Logger("bitswap")

const maxRetries = 10
const (
defaultRebroadcastInterval = 30 * time.Second
maxRetries = 10
)

// MessageNetwork is any network that can connect peers and generate a message
// sender.
Expand All @@ -33,21 +36,25 @@ type MessageQueue struct {
done chan struct{}

// do not touch out of run loop
wl *wantlist.SessionTrackedWantlist
nextMessage bsmsg.BitSwapMessage
nextMessageLk sync.RWMutex
sender bsnet.MessageSender
wl *wantlist.SessionTrackedWantlist
nextMessage bsmsg.BitSwapMessage
nextMessageLk sync.RWMutex
sender bsnet.MessageSender
rebroadcastIntervalLk sync.RWMutex
rebroadcastInterval time.Duration
rebroadcastTimer *time.Timer
}

// New creats a new MessageQueue.
func New(ctx context.Context, p peer.ID, network MessageNetwork) *MessageQueue {
return &MessageQueue{
ctx: ctx,
wl: wantlist.NewSessionTrackedWantlist(),
network: network,
p: p,
outgoingWork: make(chan struct{}, 1),
done: make(chan struct{}),
ctx: ctx,
wl: wantlist.NewSessionTrackedWantlist(),
network: network,
p: p,
outgoingWork: make(chan struct{}, 1),
done: make(chan struct{}),
rebroadcastInterval: defaultRebroadcastInterval,
}
}

Expand All @@ -64,27 +71,26 @@ func (mq *MessageQueue) AddMessage(entries []bsmsg.Entry, ses uint64) {

// AddWantlist adds a complete session tracked want list to a message queue
func (mq *MessageQueue) AddWantlist(initialWants *wantlist.SessionTrackedWantlist) {
mq.nextMessageLk.Lock()
defer mq.nextMessageLk.Unlock()

initialWants.CopyWants(mq.wl)
if initialWants.Len() > 0 {
if mq.nextMessage == nil {
mq.nextMessage = bsmsg.New(false)
}
for _, e := range initialWants.Entries() {
mq.nextMessage.AddEntry(e.Cid, e.Priority)
}
select {
case mq.outgoingWork <- struct{}{}:
default:
}
mq.addWantlist()
}

// SetRebroadcastInterval sets a new interval on which to rebroadcast the full wantlist
func (mq *MessageQueue) SetRebroadcastInterval(delay time.Duration) {
mq.rebroadcastIntervalLk.Lock()
mq.rebroadcastInterval = delay
if mq.rebroadcastTimer != nil {
mq.rebroadcastTimer.Reset(delay)
}
mq.rebroadcastIntervalLk.Unlock()
}

// Startup starts the processing of messages, and creates an initial message
// based on the given initial wantlist.
func (mq *MessageQueue) Startup() {
mq.rebroadcastIntervalLk.RLock()
mq.rebroadcastTimer = time.NewTimer(mq.rebroadcastInterval)
mq.rebroadcastIntervalLk.RUnlock()
go mq.runQueue()
}

Expand All @@ -96,6 +102,8 @@ func (mq *MessageQueue) Shutdown() {
func (mq *MessageQueue) runQueue() {
for {
select {
case <-mq.rebroadcastTimer.C:
mq.rebroadcastWantlist()
case <-mq.outgoingWork:
mq.sendMessage()
case <-mq.done:
Expand All @@ -112,6 +120,33 @@ func (mq *MessageQueue) runQueue() {
}
}

func (mq *MessageQueue) addWantlist() {

mq.nextMessageLk.Lock()
defer mq.nextMessageLk.Unlock()

if mq.wl.Len() > 0 {
if mq.nextMessage == nil {
mq.nextMessage = bsmsg.New(false)
}
for _, e := range mq.wl.Entries() {
mq.nextMessage.AddEntry(e.Cid, e.Priority)
}
select {
case mq.outgoingWork <- struct{}{}:
default:
}
}
}

func (mq *MessageQueue) rebroadcastWantlist() {
mq.rebroadcastIntervalLk.RLock()
mq.rebroadcastTimer.Reset(mq.rebroadcastInterval)
mq.rebroadcastIntervalLk.RUnlock()

mq.addWantlist()
}

func (mq *MessageQueue) addEntries(entries []bsmsg.Entry, ses uint64) bool {
var work bool
mq.nextMessageLk.Lock()
Expand Down
37 changes: 37 additions & 0 deletions messagequeue/messagequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,40 @@ func TestSendingMessagesPartialDupe(t *testing.T) {
}

}

func TestWantlistRebroadcast(t *testing.T) {

ctx := context.Background()
messagesSent := make(chan bsmsg.BitSwapMessage)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent}
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := New(ctx, peerID, fakenet)
ses := testutil.GenerateSessionID()
wl := testutil.GenerateWantlist(10, ses)

messageQueue.Startup()
messageQueue.AddWantlist(wl)
messages := collectMessages(ctx, t, messagesSent, 10*time.Millisecond)
if len(messages) != 1 {
t.Fatal("wrong number of messages were sent for initial wants")
}

messageQueue.SetRebroadcastInterval(5 * time.Millisecond)
messages = collectMessages(ctx, t, messagesSent, 8*time.Millisecond)
if len(messages) != 1 {
t.Fatal("wrong number of messages were rebroadcast")
}

firstMessage := messages[0]
if len(firstMessage.Wantlist()) != wl.Len() {
t.Fatal("did not add all wants to want list")
}
for _, entry := range firstMessage.Wantlist() {
if entry.Cancel {
t.Fatal("initial add sent cancel entry when it should not have")
}
}
}