Skip to content

Commit

Permalink
sort connections by direction and number of streams in a memory emerg…
Browse files Browse the repository at this point in the history
…ency
  • Loading branch information
marten-seemann committed Dec 13, 2021
1 parent a8a2eaa commit 3412150
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 2 deletions.
36 changes: 34 additions & 2 deletions p2p/net/connmgr/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,38 @@ func (p peerInfos) SortByValue() {
})
}

func (p peerInfos) SortByValueAndStreams() {
sort.Slice(p, func(i, j int) bool {
left, right := p[i], p[j]
// temporary peers are preferred for pruning.
if left.temp != right.temp {
return left.temp
}
// otherwise, compare by value.
if left.value != right.value {
return left.value < right.value
}
incomingAndStreams := func(m map[network.Conn]time.Time) (incoming bool, numStreams int) {
for c := range m {
stat := c.Stat()
if stat.Direction == network.DirInbound {
incoming = true
}
numStreams += stat.NumStreams
}
return
}
leftIncoming, leftStreams := incomingAndStreams(left.conns)
rightIncoming, rightStreams := incomingAndStreams(right.conns)
// incoming connections are preferred for pruning
if leftIncoming != rightIncoming {
return leftIncoming
}
// prune connections with a higher number of streams first
return rightStreams < leftStreams
})
}

// TrimOpenConns closes the connections of as many peers as needed to make the peer count
// equal the low watermark. Peers are sorted in ascending order based on their total value,
// pruning those peers with the lowest scores first, as long as they are not within their
Expand Down Expand Up @@ -327,7 +359,7 @@ func (cm *BasicConnMgr) getConnsToCloseEmergency(target int) []network.Conn {
cm.plk.RUnlock()

// Sort peers according to their value.
candidates.SortByValue()
candidates.SortByValueAndStreams()

selected := make([]network.Conn, 0, target+10)
for _, inf := range candidates {
Expand Down Expand Up @@ -357,7 +389,7 @@ func (cm *BasicConnMgr) getConnsToCloseEmergency(target int) []network.Conn {
}
cm.plk.RUnlock()

candidates.SortByValue()
candidates.SortByValueAndStreams()
for _, inf := range candidates {
if target <= 0 {
break
Expand Down
78 changes: 78 additions & 0 deletions p2p/net/connmgr/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"
"time"

"github.com/libp2p/go-libp2p-core/crypto"

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

Expand Down Expand Up @@ -777,3 +779,79 @@ func TestConcurrentCleanupAndTagging(t *testing.T) {
cm.TagPeer(conn.RemotePeer(), "test", 20)
}
}

type mockConn struct {
stats network.ConnStats
}

func (m mockConn) Close() error { panic("implement me") }
func (m mockConn) LocalPeer() peer.ID { panic("implement me") }
func (m mockConn) LocalPrivateKey() crypto.PrivKey { panic("implement me") }
func (m mockConn) RemotePeer() peer.ID { panic("implement me") }
func (m mockConn) RemotePublicKey() crypto.PubKey { panic("implement me") }
func (m mockConn) LocalMultiaddr() ma.Multiaddr { panic("implement me") }
func (m mockConn) RemoteMultiaddr() ma.Multiaddr { panic("implement me") }
func (m mockConn) Stat() network.ConnStats { return m.stats }
func (m mockConn) ID() string { panic("implement me") }
func (m mockConn) NewStream(ctx context.Context) (network.Stream, error) { panic("implement me") }
func (m mockConn) GetStreams() []network.Stream { panic("implement me") }

func TestPeerInfoSorting(t *testing.T) {
t.Run("starts with temporary connections", func(t *testing.T) {
p1 := peerInfo{id: peer.ID("peer1")}
p2 := peerInfo{id: peer.ID("peer2"), temp: true}
pis := peerInfos{p1, p2}
pis.SortByValue()
require.Equal(t, pis, peerInfos{p2, p1})
})

t.Run("starts with low-value connections", func(t *testing.T) {
p1 := peerInfo{id: peer.ID("peer1"), value: 40}
p2 := peerInfo{id: peer.ID("peer2"), value: 20}
pis := peerInfos{p1, p2}
pis.SortByValue()
require.Equal(t, pis, peerInfos{p2, p1})
})

t.Run("in a memory emergency, starts with incoming connections", func(t *testing.T) {
incoming := network.ConnStats{}
incoming.Direction = network.DirInbound
outgoing := network.ConnStats{}
outgoing.Direction = network.DirOutbound
p1 := peerInfo{
id: peer.ID("peer1"),
conns: map[network.Conn]time.Time{
&mockConn{stats: outgoing}: time.Now(),
},
}
p2 := peerInfo{
id: peer.ID("peer2"),
conns: map[network.Conn]time.Time{
&mockConn{stats: outgoing}: time.Now(),
&mockConn{stats: incoming}: time.Now(),
},
}
pis := peerInfos{p1, p2}
pis.SortByValueAndStreams()
require.Equal(t, pis, peerInfos{p2, p1})
})

t.Run("in a memory emergency, starts with connections that have many streams", func(t *testing.T) {
p1 := peerInfo{
id: peer.ID("peer1"),
conns: map[network.Conn]time.Time{
&mockConn{stats: network.ConnStats{NumStreams: 100}}: time.Now(),
},
}
p2 := peerInfo{
id: peer.ID("peer2"),
conns: map[network.Conn]time.Time{
&mockConn{stats: network.ConnStats{NumStreams: 80}}: time.Now(),
&mockConn{stats: network.ConnStats{NumStreams: 40}}: time.Now(),
},
}
pis := peerInfos{p1, p2}
pis.SortByValueAndStreams()
require.Equal(t, pis, peerInfos{p2, p1})
})
}

0 comments on commit 3412150

Please sign in to comment.