diff --git a/bitswap/internal/messagequeue/messagequeue.go b/bitswap/internal/messagequeue/messagequeue.go index 4b3f090d7..1a8c2d5a5 100644 --- a/bitswap/internal/messagequeue/messagequeue.go +++ b/bitswap/internal/messagequeue/messagequeue.go @@ -113,8 +113,8 @@ func (r *recallWantlist) RemoveType(c cid.Cid, wtype pb.Message_Wantlist_WantTyp r.pending.RemoveType(c, wtype) } -// Sent moves the want from the pending to the sent list -func (r *recallWantlist) Sent(e bsmsg.Entry) { +// MarkSent moves the want from the pending to the sent list +func (r *recallWantlist) MarkSent(e wantlist.Entry) { r.pending.RemoveType(e.Cid, e.WantType) r.sent.Add(e.Cid, e.Priority, e.WantType) } @@ -439,7 +439,7 @@ func (mq *MessageQueue) sendMessage() { for i := 0; i < maxRetries; i++ { if mq.attemptSendAndRecovery(message) { // We were able to send successfully. - onSent(wantlist) + onSent() mq.simulateDontHaveWithTimeout(wantlist) @@ -540,7 +540,7 @@ func (mq *MessageQueue) pendingWorkCount() int { } // Convert the lists of wants into a Bitswap message -func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwapMessage, func([]bsmsg.Entry)) { +func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwapMessage, func()) { mq.wllock.Lock() defer mq.wllock.Unlock() @@ -566,6 +566,7 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap } // Add each regular want-have / want-block to the message + peerSent := make([]wantlist.Entry, 0, len(peerEntries)) for i := 0; i < len(peerEntries) && msgSize < mq.maxMessageSize; i++ { e := peerEntries[i] // If the remote peer doesn't support HAVE / DONT_HAVE messages, @@ -574,11 +575,13 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap mq.peerWants.RemoveType(e.Cid, pb.Message_Wantlist_Have) } else { msgSize += mq.msg.AddEntry(e.Cid, e.Priority, e.WantType, true) + peerSent = append(peerSent, e) } } // Add each broadcast want-have to the message - for i := 0; i < len(bcstEntries) && msgSize < mq.maxMessageSize; i++ { + bcstSentCount := 0 + for ; bcstSentCount < len(bcstEntries) && msgSize < mq.maxMessageSize; bcstSentCount++ { // Broadcast wants are sent as want-have wantType := pb.Message_Wantlist_Have @@ -588,41 +591,27 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap wantType = pb.Message_Wantlist_Block } - e := bcstEntries[i] + e := bcstEntries[bcstSentCount] msgSize += mq.msg.AddEntry(e.Cid, e.Priority, wantType, false) } // Called when the message has been successfully sent. - onMessageSent := func(wantlist []bsmsg.Entry) { - bcst := keysToSet(bcstEntries) - prws := keysToSet(peerEntries) - + onMessageSent := func() { mq.wllock.Lock() defer mq.wllock.Unlock() // Move the keys from pending to sent - for _, e := range wantlist { - if _, ok := bcst[e.Cid]; ok { - mq.bcstWants.Sent(e) - } - if _, ok := prws[e.Cid]; ok { - mq.peerWants.Sent(e) - } + for i := 0; i < bcstSentCount; i++ { + mq.bcstWants.MarkSent(bcstEntries[i]) + } + for _, e := range peerSent { + mq.peerWants.MarkSent(e) } } return mq.msg, onMessageSent } -// Convert wantlist entries into a set of cids -func keysToSet(wl []wantlist.Entry) map[cid.Cid]struct{} { - set := make(map[cid.Cid]struct{}, len(wl)) - for _, e := range wl { - set[e.Cid] = struct{}{} - } - return set -} - func (mq *MessageQueue) initializeSender() error { if mq.sender != nil { return nil