Skip to content

Commit

Permalink
connmgr: introduce abstractions and functions for decaying tags. (#104)
Browse files Browse the repository at this point in the history
  • Loading branch information
raulk authored May 14, 2020
1 parent 688f105 commit 70a8c2c
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 6 deletions.
93 changes: 93 additions & 0 deletions core/connmgr/decay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package connmgr

import (
"io"
"time"

"github.com/libp2p/go-libp2p-core/peer"
)

// Decayer is implemented by connection managers supporting decaying tags. A
// decaying tag is one whose value automatically decays over time.
//
// The actual application of the decay behaviour is encapsulated in a
// user-provided decaying function (DecayFn). The function is called on every
// tick (determined by the interval parameter), and returns either the new value
// of the tag, or whether it should be erased altogether.
//
// We do not set values on a decaying tag. Rather, we "bump" decaying tags by a
// delta. This calls the BumpFn with the old value and the delta, to determine
// the new value.
//
// Such a pluggable design affords a great deal of flexibility and versatility.
// Behaviours that are straightfoward to implement include:
//
// * Decay a tag by -1, or by half its current value, on every tick.
// * Every time a value is bumped, sum it to its current value.
// * Exponentially boost a score with every bump.
// * Sum the incoming score, but keep it within min, max bounds.
//
// Commonly used DecayFns and BumpFns are provided in the go-libp2p-connmgr
// module.
type Decayer interface {
io.Closer

// RegisterDecayingTag creates and registers a new decaying tag, if and only
// if a tag with the supplied name doesn't exist yet. Otherwise, an error is
// returned.
//
// The caller provides the interval at which the tag is refreshed, as well
// as the decay function and the bump function. Refer to godocs on DecayFn
// and BumpFn for more info.
RegisterDecayingTag(name string, interval time.Duration, decayFn DecayFn, bumpFn BumpFn) (DecayingTag, error)
}

// DecayFn applies a decay to the peer's score. The implementation must call
// DecayFn at the interval supplied when registering the tag.
//
// It receives a copy of the decaying value, and returns the score after
// applying the decay, as well as a flag to signal if the tag should be erased.
type DecayFn func(value DecayingValue) (after int, rm bool)

// BumpFn applies a delta onto an existing score, and returns the new score.
//
// Non-trivial bump functions include exponential boosting, moving averages,
// ceilings, etc.
type BumpFn func(value DecayingValue, delta int) (after int)

// DecayingTag represents a decaying tag. The tag is a long-lived general
// object, used to operate on tag values for peers.
type DecayingTag interface {
// Name returns the name of the tag.
Name() string

// Interval is the effective interval at which this tag will tick. Upon
// registration, the desired interval may be overwritten depending on the
// decayer's resolution, and this method allows you to obtain the effective
// interval.
Interval() time.Duration

// Bump applies a delta to a tag value, calling its bump function. The bump
// may be applied asynchronously, in which case the returned error is used
// to indicate an anomaly when queuing.
Bump(peer peer.ID, delta int) error
}

// DecayingValue represents a value for a decaying tag.
type DecayingValue struct {
// Tag points to the tag this value belongs to.
Tag DecayingTag

// Peer is the peer ID to whom this value is associated.
Peer peer.ID

// Added is the timestamp when this value was added for the first time for
// a tag and a peer.
Added time.Time

// LastVisit is the timestamp of the last visit.
LastVisit time.Time

// Value is the current value of the tag.
Value int
}
21 changes: 15 additions & 6 deletions core/connmgr/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,23 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
)

// ConnManager tracks connections to peers, and allows consumers to associate metadata
// with each peer.
// SupportsDecay evaluates if the provided ConnManager supports decay, and if
// so, it returns the Decayer object. Refer to godocs on Decayer for more info.
func SupportsDecay(mgr ConnManager) (Decayer, bool) {
d, ok := mgr.(Decayer)
return d, ok
}

// ConnManager tracks connections to peers, and allows consumers to associate
// metadata with each peer.
//
// It enables connections to be trimmed based on implementation-defined heuristics.
// The ConnManager allows libp2p to enforce an upper bound on the total number of
// open connections.
// It enables connections to be trimmed based on implementation-defined
// heuristics. The ConnManager allows libp2p to enforce an upper bound on the
// total number of open connections.
//
// ConnManagers supporting decaying tags implement Decayer. Use the
// SupportsDecay function to safely cast an instance to Decayer, if supported.
type ConnManager interface {

// TagPeer tags a peer with a string, associating a weight with the tag.
TagPeer(peer.ID, string, int)

Expand Down
67 changes: 67 additions & 0 deletions core/connmgr/presets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package connmgr

import (
"math"
"time"
)

// DecayNone applies no decay.
func DecayNone() DecayFn {
return func(value DecayingValue) (_ int, rm bool) {
return value.Value, false
}
}

// DecayFixed subtracts from by the provided minuend, and deletes the tag when
// first reaching 0 or negative.
func DecayFixed(minuend int) DecayFn {
return func(value DecayingValue) (_ int, rm bool) {
v := value.Value - minuend
return v, v <= 0
}
}

// DecayLinear applies a fractional coefficient to the value of the current tag,
// rounding down via math.Floor. It erases the tag when the result is zero.
func DecayLinear(coef float64) DecayFn {
return func(value DecayingValue) (after int, rm bool) {
v := math.Floor(float64(value.Value) * coef)
return int(v), v <= 0
}
}

// DecayExpireWhenInactive expires a tag after a certain period of no bumps.
func DecayExpireWhenInactive(after time.Duration) DecayFn {
return func(value DecayingValue) (_ int, rm bool) {
rm = value.LastVisit.Sub(time.Now()) >= after
return 0, rm
}
}

// BumpSumUnbounded adds the incoming value to the peer's score.
func BumpSumUnbounded() BumpFn {
return func(value DecayingValue, delta int) (after int) {
return value.Value + delta
}
}

// BumpSumBounded keeps summing the incoming score, keeping it within a
// [min, max] range.
func BumpSumBounded(min, max int) BumpFn {
return func(value DecayingValue, delta int) (after int) {
v := value.Value + delta
if v >= max {
return max
} else if v <= min {
return min
}
return v
}
}

// BumpOverwrite replaces the current value of the tag with the incoming one.
func BumpOverwrite() BumpFn {
return func(value DecayingValue, delta int) (after int) {
return delta
}
}

0 comments on commit 70a8c2c

Please sign in to comment.