Skip to content

Commit

Permalink
Merge pull request #86 from libp2p/memory-emergency
Browse files Browse the repository at this point in the history
aggressively trim connections when we're running out of memory
  • Loading branch information
marten-seemann authored Dec 13, 2021
2 parents dfe8394 + ff0577e commit 1a3d6a6
Show file tree
Hide file tree
Showing 2 changed files with 249 additions and 80 deletions.
231 changes: 171 additions & 60 deletions p2p/net/connmgr/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

logging "github.com/ipfs/go-log/v2"
ma "github.com/multiformats/go-multiaddr"
"github.com/raulk/go-watchdog"
)

var log = logging.Logger("connmgr")
Expand All @@ -34,16 +35,19 @@ type BasicConnMgr struct {
protected map[peer.ID]map[string]struct{}

// channel-based semaphore that enforces only a single trim is in progress
trimRunningCh chan struct{}
trimTrigger chan chan<- struct{}
connCount int32
trimMutex sync.Mutex
connCount int32
// to be accessed atomically. This is mimicking the implementation of a sync.Once.
// Take care of correct alignment when modifying this struct.
trimCount uint64

lastTrimMu sync.RWMutex
lastTrim time.Time

refCount sync.WaitGroup
ctx context.Context
cancel func()
refCount sync.WaitGroup
ctx context.Context
cancel func()
unregisterWatchdog func()
}

var (
Expand Down Expand Up @@ -112,10 +116,8 @@ func NewConnManager(low, hi int, opts ...Option) (*BasicConnMgr, error) {
}

cm := &BasicConnMgr{
cfg: cfg,
trimRunningCh: make(chan struct{}, 1),
trimTrigger: make(chan chan<- struct{}),
protected: make(map[peer.ID]map[string]struct{}, 16),
cfg: cfg,
protected: make(map[peer.ID]map[string]struct{}, 16),
segments: func() (ret segments) {
for i := range ret {
ret[i] = &segment{
Expand All @@ -127,6 +129,9 @@ func NewConnManager(low, hi int, opts ...Option) (*BasicConnMgr, error) {
}
cm.ctx, cm.cancel = context.WithCancel(context.Background())

// When we're running low on memory, immediately trigger a trim.
cm.unregisterWatchdog = watchdog.RegisterPostGCNotifee(cm.memoryEmergency)

decay, _ := NewDecayer(cfg.decayer, cm)
cm.decayer = decay

Expand All @@ -135,8 +140,39 @@ func NewConnManager(low, hi int, opts ...Option) (*BasicConnMgr, error) {
return cm, nil
}

// memoryEmergency is run when we run low on memory.
// Close connections until we right the low watermark.
// We don't pay attention to the silence period or the grace period.
// We try to not kill protected connections, but if that turns out to be necessary, not connection is safe!
func (cm *BasicConnMgr) memoryEmergency() {
connCount := int(atomic.LoadInt32(&cm.connCount))
target := connCount - cm.cfg.lowWater
if target < 0 {
log.Warnw("Low on memory, but we only have a few connections", "num", connCount, "low watermark", cm.cfg.lowWater)
return
} else {
log.Warnf("Low on memory. Closing %d connections.", target)
}

cm.trimMutex.Lock()
defer atomic.AddUint64(&cm.trimCount, 1)
defer cm.trimMutex.Unlock()

// Trim connections without paying attention to the silence period.
for _, c := range cm.getConnsToCloseEmergency(target) {
log.Infow("low on memory. closing conn", "peer", c.RemotePeer())
c.Close()
}

// finally, update the last trim time.
cm.lastTrimMu.Lock()
cm.lastTrim = time.Now()
cm.lastTrimMu.Unlock()
}

func (cm *BasicConnMgr) Close() error {
cm.cancel()
cm.unregisterWatchdog()
if err := cm.decayer.Close(); err != nil {
return err
}
Expand Down Expand Up @@ -202,6 +238,52 @@ type peerInfo struct {
firstSeen time.Time // timestamp when we began tracking this peer.
}

type peerInfos []peerInfo

func (p peerInfos) SortByValue() {
sort.Slice(p, func(i, j int) bool {
left, right := p[i], p[j]
// temporary peers are preferred for pruning.
if left.temp != right.temp {
return left.temp
}
// otherwise, compare by value.
return left.value < right.value
})
}

func (p peerInfos) SortByValueAndStreams() {
sort.Slice(p, func(i, j int) bool {
left, right := p[i], p[j]
// temporary peers are preferred for pruning.
if left.temp != right.temp {
return left.temp
}
// otherwise, compare by value.
if left.value != right.value {
return left.value < right.value
}
incomingAndStreams := func(m map[network.Conn]time.Time) (incoming bool, numStreams int) {
for c := range m {
stat := c.Stat()
if stat.Direction == network.DirInbound {
incoming = true
}
numStreams += stat.NumStreams
}
return
}
leftIncoming, leftStreams := incomingAndStreams(left.conns)
rightIncoming, rightStreams := incomingAndStreams(right.conns)
// incoming connections are preferred for pruning
if leftIncoming != rightIncoming {
return leftIncoming
}
// prune connections with a higher number of streams first
return rightStreams < leftStreams
})
}

// TrimOpenConns closes the connections of as many peers as needed to make the peer count
// equal the low watermark. Peers are sorted in ascending order based on their total value,
// pruning those peers with the lowest scores first, as long as they are not within their
Expand All @@ -210,26 +292,11 @@ type peerInfo struct {
// This function blocks until a trim is completed. If a trim is underway, a new
// one won't be started, and instead it'll wait until that one is completed before
// returning.
func (cm *BasicConnMgr) TrimOpenConns(ctx context.Context) {
func (cm *BasicConnMgr) TrimOpenConns(_ context.Context) {
// TODO: error return value so we can cleanly signal we are aborting because:
// (a) there's another trim in progress, or (b) the silence period is in effect.

// Trigger a trim.
ch := make(chan struct{})
select {
case cm.trimTrigger <- ch:
case <-cm.ctx.Done():
case <-ctx.Done():
// TODO: return an error?
}

// Wait for the trim.
select {
case <-ch:
case <-cm.ctx.Done():
case <-ctx.Done():
// TODO: return an error?
}
cm.doTrim()
}

func (cm *BasicConnMgr) background() {
Expand All @@ -244,49 +311,101 @@ func (cm *BasicConnMgr) background() {
defer ticker.Stop()

for {
var waiting chan<- struct{}
select {
case <-ticker.C:
if atomic.LoadInt32(&cm.connCount) < int32(cm.cfg.highWater) {
// Below high water, skip.
continue
}
case waiting = <-cm.trimTrigger:
case <-cm.ctx.Done():
return
}
cm.trim()
}
}

// Notify anyone waiting on this trim.
if waiting != nil {
close(waiting)
}

for {
select {
case waiting = <-cm.trimTrigger:
if waiting != nil {
close(waiting)
}
continue
default:
}
break
}
func (cm *BasicConnMgr) doTrim() {
// This logic is mimicking the implementation of sync.Once in the standard library.
count := atomic.LoadUint64(&cm.trimCount)
cm.trimMutex.Lock()
defer cm.trimMutex.Unlock()
if count == atomic.LoadUint64(&cm.trimCount) {
cm.trim()
cm.lastTrimMu.Lock()
cm.lastTrim = time.Now()
cm.lastTrimMu.Unlock()
atomic.AddUint64(&cm.trimCount, 1)
}
}

// trim starts the trim, if the last trim happened before the configured silence period.
func (cm *BasicConnMgr) trim() {
// do the actual trim.
for _, c := range cm.getConnsToClose() {
log.Info("closing conn: ", c.RemotePeer())
log.Infow("closing conn", "peer", c.RemotePeer())
c.Close()
}
}

// finally, update the last trim time.
cm.lastTrimMu.Lock()
cm.lastTrim = time.Now()
cm.lastTrimMu.Unlock()
func (cm *BasicConnMgr) getConnsToCloseEmergency(target int) []network.Conn {
candidates := make(peerInfos, 0, cm.segments.countPeers())

cm.plk.RLock()
for _, s := range cm.segments {
s.Lock()
for id, inf := range s.peers {
if _, ok := cm.protected[id]; ok {
// skip over protected peer.
continue
}
candidates = append(candidates, *inf)
}
s.Unlock()
}
cm.plk.RUnlock()

// Sort peers according to their value.
candidates.SortByValueAndStreams()

selected := make([]network.Conn, 0, target+10)
for _, inf := range candidates {
if target <= 0 {
break
}
for c := range inf.conns {
selected = append(selected, c)
}
target -= len(inf.conns)
}
if len(selected) >= target {
// We found enough connections that were not protected.
return selected
}

// We didn't find enough unprotected connections.
// We have no choice but to kill some protected connections.
candidates = candidates[:0]
cm.plk.RLock()
for _, s := range cm.segments {
s.Lock()
for _, inf := range s.peers {
candidates = append(candidates, *inf)
}
s.Unlock()
}
cm.plk.RUnlock()

candidates.SortByValueAndStreams()
for _, inf := range candidates {
if target <= 0 {
break
}
for c := range inf.conns {
selected = append(selected, c)
}
target -= len(inf.conns)
}
return selected
}

// getConnsToClose runs the heuristics described in TrimOpenConns and returns the
Expand All @@ -302,7 +421,7 @@ func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
return nil
}

candidates := make([]peerInfo, 0, cm.segments.countPeers())
candidates := make(peerInfos, 0, cm.segments.countPeers())
var ncandidates int
gracePeriodStart := time.Now().Add(-cm.cfg.gracePeriod)

Expand Down Expand Up @@ -337,15 +456,7 @@ func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
}

// Sort peers according to their value.
sort.Slice(candidates, func(i, j int) bool {
left, right := candidates[i], candidates[j]
// temporary peers are preferred for pruning.
if left.temp != right.temp {
return left.temp
}
// otherwise, compare by value.
return left.value < right.value
})
candidates.SortByValue()

target := ncandidates - cm.cfg.lowWater

Expand Down
Loading

0 comments on commit 1a3d6a6

Please sign in to comment.