From 2fe1405be75ba40100aee7cf3a41ab85becdd065 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 17 Apr 2020 17:28:04 -0400 Subject: [PATCH] feat: optimize entry sorting in MessageQueue --- internal/messagequeue/messagequeue.go | 25 ++++++++++++++++++++++--- message/message.go | 19 +++++++++++++++++++ 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/internal/messagequeue/messagequeue.go b/internal/messagequeue/messagequeue.go index d42db10d..4e245095 100644 --- a/internal/messagequeue/messagequeue.go +++ b/internal/messagequeue/messagequeue.go @@ -544,9 +544,28 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap mq.wllock.Lock() defer mq.wllock.Unlock() - // Get broadcast and regular wantlist entries - bcstEntries := mq.bcstWants.pending.SortedEntries() - peerEntries := mq.peerWants.pending.SortedEntries() + // Get broadcast and regular wantlist entries. + // SortedEntries() slows down the MessageQueue a lot, and entries only need + // to be sorted if the number of wants will overflow the size of the + // message (to make sure that the highest priority wants are sent in the + // first message). + // We prioritize cancels, then regular wants, then broadcast wants. + var peerEntries []bswl.Entry + var bcstEntries []bswl.Entry + maxCancelsSize := mq.cancels.Len() * bsmsg.MaxEntrySize + maxPeerSize := mq.peerWants.pending.Len() * bsmsg.MaxEntrySize + maxBcstSize := mq.bcstWants.pending.Len() * bsmsg.MaxEntrySize + + if maxCancelsSize+maxPeerSize < mq.maxMessageSize { + peerEntries = mq.peerWants.pending.Entries() + } else { + peerEntries = mq.peerWants.pending.SortedEntries() + } + if maxCancelsSize+maxPeerSize+maxBcstSize < mq.maxMessageSize { + bcstEntries = mq.bcstWants.pending.Entries() + } else { + bcstEntries = mq.bcstWants.pending.SortedEntries() + } // Size of the message so far msgSize := 0 diff --git a/message/message.go b/message/message.go index 8377ea73..f820c9dc 100644 --- a/message/message.go +++ b/message/message.go @@ -13,6 +13,7 @@ import ( pool "github.com/libp2p/go-buffer-pool" msgio "github.com/libp2p/go-msgio" + u "github.com/ipfs/go-ipfs-util" "github.com/libp2p/go-libp2p-core/network" ) @@ -118,6 +119,24 @@ func (e *Entry) ToPB() pb.Message_Wantlist_Entry { } } +var MaxEntrySize = maxEntrySize() + +func maxEntrySize() int { + var maxInt32 int32 = (1 << 31) - 1 + + c := cid.NewCidV0(u.Hash([]byte("cid"))) + e := Entry{ + Entry: wantlist.Entry{ + Cid: c, + Priority: maxInt32, + WantType: pb.Message_Wantlist_Have, + }, + SendDontHave: true, // true takes up more space than false + Cancel: true, + } + return e.Size() +} + type impl struct { full bool wantlist map[cid.Cid]*Entry