Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

feat: optimize entry sorting in MessageQueue #356

Merged
merged 1 commit into from
Apr 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,9 +544,28 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap
mq.wllock.Lock()
defer mq.wllock.Unlock()

// Get broadcast and regular wantlist entries
bcstEntries := mq.bcstWants.pending.SortedEntries()
peerEntries := mq.peerWants.pending.SortedEntries()
// Get broadcast and regular wantlist entries.
// SortedEntries() slows down the MessageQueue a lot, and entries only need
// to be sorted if the number of wants will overflow the size of the
// message (to make sure that the highest priority wants are sent in the
// first message).
// We prioritize cancels, then regular wants, then broadcast wants.
var peerEntries []bswl.Entry
var bcstEntries []bswl.Entry
maxCancelsSize := mq.cancels.Len() * bsmsg.MaxEntrySize
maxPeerSize := mq.peerWants.pending.Len() * bsmsg.MaxEntrySize
maxBcstSize := mq.bcstWants.pending.Len() * bsmsg.MaxEntrySize

if maxCancelsSize+maxPeerSize < mq.maxMessageSize {
peerEntries = mq.peerWants.pending.Entries()
} else {
peerEntries = mq.peerWants.pending.SortedEntries()
}
if maxCancelsSize+maxPeerSize+maxBcstSize < mq.maxMessageSize {
bcstEntries = mq.bcstWants.pending.Entries()
} else {
bcstEntries = mq.bcstWants.pending.SortedEntries()
}

// Size of the message so far
msgSize := 0
Expand Down
19 changes: 19 additions & 0 deletions message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
pool "github.com/libp2p/go-buffer-pool"
msgio "github.com/libp2p/go-msgio"

u "github.com/ipfs/go-ipfs-util"
"github.com/libp2p/go-libp2p-core/network"
)

Expand Down Expand Up @@ -118,6 +119,24 @@ func (e *Entry) ToPB() pb.Message_Wantlist_Entry {
}
}

var MaxEntrySize = maxEntrySize()

func maxEntrySize() int {
var maxInt32 int32 = (1 << 31) - 1

c := cid.NewCidV0(u.Hash([]byte("cid")))
e := Entry{
Entry: wantlist.Entry{
Cid: c,
Priority: maxInt32,
WantType: pb.Message_Wantlist_Have,
},
SendDontHave: true, // true takes up more space than false
Cancel: true,
}
return e.Size()
}

type impl struct {
full bool
wantlist map[cid.Cid]*Entry
Expand Down