Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expire messages from the cache based on last seen time #513

Merged
merged 6 commits into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions blacklist.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package pubsub

import (
"sync"
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/whyrusleeping/timecache"

"github.com/libp2p/go-libp2p-pubsub/timecache"
)

// Blacklist is an interface for peer blacklisting.
Expand Down Expand Up @@ -34,8 +34,7 @@ func (b MapBlacklist) Contains(p peer.ID) bool {

// TimeCachedBlacklist is a blacklist implementation using a time cache
type TimeCachedBlacklist struct {
sync.RWMutex
tc *timecache.TimeCache
tc timecache.TimeCache
smrz2001 marked this conversation as resolved.
Show resolved Hide resolved
}

// NewTimeCachedBlacklist creates a new TimeCachedBlacklist with the given expiry duration
Expand All @@ -46,8 +45,6 @@ func NewTimeCachedBlacklist(expiry time.Duration) (Blacklist, error) {

// Add returns a bool saying whether Add of peer was successful
func (b *TimeCachedBlacklist) Add(p peer.ID) bool {
b.Lock()
defer b.Unlock()
s := p.String()
if b.tc.Has(s) {
return false
Expand All @@ -57,8 +54,5 @@ func (b *TimeCachedBlacklist) Add(p peer.ID) bool {
}

func (b *TimeCachedBlacklist) Contains(p peer.ID) bool {
b.RLock()
defer b.RUnlock()

return b.tc.Has(p.String())
}
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ go 1.17

require (
github.com/benbjohnson/clock v1.3.0
github.com/emirpasic/gods v1.18.1
smrz2001 marked this conversation as resolved.
Show resolved Hide resolved
github.com/gogo/protobuf v1.3.2
github.com/ipfs/go-log v1.0.5
github.com/libp2p/go-buffer-pool v0.1.0
github.com/libp2p/go-libp2p v0.22.0
github.com/libp2p/go-libp2p-testing v0.12.0
github.com/libp2p/go-msgio v0.2.0
github.com/multiformats/go-multiaddr v0.6.0
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee
github.com/multiformats/go-varint v0.0.6
)

require (
Expand All @@ -36,7 +38,6 @@ require (
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/klauspost/compress v1.15.1 // indirect
github.com/klauspost/cpuid/v2 v2.1.0 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/libp2p/go-flow-metrics v0.1.0 // indirect
github.com/libp2p/go-netroute v0.2.0 // indirect
github.com/libp2p/go-openssl v0.1.0 // indirect
Expand All @@ -62,7 +63,6 @@ require (
github.com/multiformats/go-multicodec v0.5.0 // indirect
github.com/multiformats/go-multihash v0.2.1 // indirect
github.com/multiformats/go-multistream v0.3.3 // indirect
github.com/multiformats/go-varint v0.0.6 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/opencontainers/runtime-spec v1.0.2 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25Kn
github.com/elastic/gosigar v0.12.0/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs=
github.com/elastic/gosigar v0.14.2 h1:Dg80n8cr90OZ7x+bAax/QjoW/XqTI11RmA79ZwIm9/4=
github.com/elastic/gosigar v0.14.2/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand Down Expand Up @@ -544,8 +546,6 @@ github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljT
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow=
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:m2aV4LZI4Aez7dP5PMyVKEHhUyEJ/RjmPEDOpDvudHg=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
24 changes: 19 additions & 5 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p-pubsub/timecache"

"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/discovery"
Expand All @@ -20,7 +21,6 @@ import (
"github.com/libp2p/go-libp2p/core/protocol"

logging "github.com/ipfs/go-log"
"github.com/whyrusleeping/timecache"
)

// DefaultMaximumMessageSize is 1mb.
Expand All @@ -31,6 +31,10 @@ var (
// Use WithSeenMessagesTTL to configure this per pubsub instance, instead of overriding the global default.
TimeCacheDuration = 120 * time.Second

// TimeCacheStrategy specifies which type of lookup/cleanup strategy is used by the seen messages cache.
// Use WithSeenMessagesStrategy to configure this per pubsub instance, instead of overriding the global default.
TimeCacheStrategy = timecache.Strategy_FirstSeen

// ErrSubscriptionCancelled may be returned when a subscription Next() is called after the
// subscription has been cancelled.
ErrSubscriptionCancelled = errors.New("subscription cancelled")
Expand Down Expand Up @@ -148,9 +152,10 @@ type PubSub struct {
inboundStreamsMx sync.Mutex
inboundStreams map[peer.ID]network.Stream

seenMessagesMx sync.Mutex
seenMessages *timecache.TimeCache
seenMsgTTL time.Duration
seenMessagesMx sync.Mutex
seenMessages timecache.TimeCache
seenMsgTTL time.Duration
seenMsgStrategy timecache.Strategy

// generator used to compute the ID for a message
idGen *msgIDGenerator
Expand Down Expand Up @@ -286,6 +291,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
blacklist: NewMapBlacklist(),
blacklistPeer: make(chan peer.ID),
seenMsgTTL: TimeCacheDuration,
seenMsgStrategy: TimeCacheStrategy,
idGen: newMsgIdGenerator(),
counter: uint64(time.Now().UnixNano()),
}
Expand All @@ -307,7 +313,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
}
}

ps.seenMessages = timecache.NewTimeCache(ps.seenMsgTTL)
ps.seenMessages = timecache.NewTimeCacheWithStrategy(ps.seenMsgStrategy, ps.seenMsgTTL)

if err := ps.disc.Start(ps); err != nil {
return nil, err
Expand Down Expand Up @@ -533,6 +539,14 @@ func WithSeenMessagesTTL(ttl time.Duration) Option {
}
}

// WithSeenMessagesStrategy configures which type of lookup/cleanup strategy is used by the seen messages cache
func WithSeenMessagesStrategy(strategy timecache.Strategy) Option {
return func(ps *PubSub) error {
ps.seenMsgStrategy = strategy
return nil
}
}

// WithAppSpecificRpcInspector sets a hook that inspect incomings RPCs prior to
// processing them. The inspector is invoked on an accepted RPC just before it
// is handled. If inspector's error is nil, the RPC is handled. Otherwise, it
Expand Down
71 changes: 71 additions & 0 deletions timecache/first_seen_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package timecache

import (
"container/list"
"sync"
"time"
)

// FirstSeenCache is a thread-safe copy of https://github.com/whyrusleeping/timecache.
type FirstSeenCache struct {
q *list.List
m map[string]time.Time
span time.Duration
guard *sync.RWMutex
}

func newFirstSeenCache(span time.Duration) TimeCache {
return &FirstSeenCache{
q: list.New(),
m: make(map[string]time.Time),
span: span,
guard: new(sync.RWMutex),
}
}

func (tc FirstSeenCache) Add(s string) {
tc.guard.Lock()
defer tc.guard.Unlock()

_, ok := tc.m[s]
if ok {
panic("putting the same entry twice not supported")
}

// TODO(#515): Do GC in the background
tc.sweep()
smrz2001 marked this conversation as resolved.
Show resolved Hide resolved

tc.m[s] = time.Now()
tc.q.PushFront(s)
}

func (tc FirstSeenCache) sweep() {
for {
back := tc.q.Back()
if back == nil {
return
}

v := back.Value.(string)
t, ok := tc.m[v]
if !ok {
panic("inconsistent cache state")
}

if time.Since(t) > tc.span {
tc.q.Remove(back)
delete(tc.m, v)
} else {
return
}
}
}

func (tc FirstSeenCache) Has(s string) bool {
tc.guard.RLock()
defer tc.guard.RUnlock()

ts, ok := tc.m[s]
// Only consider the entry found if it was present in the cache AND hadn't already expired.
return ok && time.Since(ts) <= tc.span
smrz2001 marked this conversation as resolved.
Show resolved Hide resolved
}
39 changes: 39 additions & 0 deletions timecache/first_seen_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package timecache

import (
"fmt"
"testing"
"time"
)

func TestFirstSeenCacheFound(t *testing.T) {
tc := newFirstSeenCache(time.Minute)

tc.Add("test")

if !tc.Has("test") {
t.Fatal("should have this key")
}
}

func TestFirstSeenCacheExpire(t *testing.T) {
tc := newFirstSeenCache(time.Second)
for i := 0; i < 11; i++ {
tc.Add(fmt.Sprint(i))
time.Sleep(time.Millisecond * 100)
}

if tc.Has(fmt.Sprint(0)) {
t.Fatal("should have dropped this from the cache already")
}
}

func TestFirstSeenCacheNotFoundAfterExpire(t *testing.T) {
tc := newFirstSeenCache(time.Second)
tc.Add(fmt.Sprint(0))
time.Sleep(1100 * time.Millisecond)

if tc.Has(fmt.Sprint(0)) {
t.Fatal("should have dropped this from the cache already")
}
}
84 changes: 84 additions & 0 deletions timecache/last_seen_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package timecache

import (
"sync"
"time"

"github.com/emirpasic/gods/maps/linkedhashmap"
)

// LastSeenCache is a LRU cache that keeps entries for up to a specified time duration. After this duration has elapsed,
// "old" entries will be purged from the cache.
//
// It's also a "sliding window" cache. Every time an unexpired entry is seen again, its timestamp slides forward. This
// keeps frequently occurring entries cached and prevents them from being propagated, especially because of network
// issues that might increase the number of duplicate messages in the network.
//
// Garbage collection of expired entries is event-driven, i.e. it only happens when there is a new entry added to the
// cache. This should be ok - if existing entries are being looked up then the cache is not growing, and when a new one
// appears that would grow the cache, garbage collection will attempt to reduce the pressure on the cache.
//
// This implementation is heavily inspired by https://github.com/whyrusleeping/timecache.
type LastSeenCache struct {
m *linkedhashmap.Map
span time.Duration
guard *sync.Mutex
}

func newLastSeenCache(span time.Duration) TimeCache {
return &LastSeenCache{
m: linkedhashmap.New(),
span: span,
guard: new(sync.Mutex),
}
}

func (tc *LastSeenCache) Add(s string) {
tc.guard.Lock()
defer tc.guard.Unlock()

tc.add(s)

// Garbage collect expired entries
// TODO(#515): Do GC in the background
tc.gc()
smrz2001 marked this conversation as resolved.
Show resolved Hide resolved
}

func (tc *LastSeenCache) add(s string) {
// We don't need a lock here because this function is always called with the lock already acquired.

// If an entry already exists, remove it and add a new one to the back of the list to maintain temporal ordering and
// an accurate sliding window.
tc.m.Remove(s)
now := time.Now()
tc.m.Put(s, &now)
}

func (tc *LastSeenCache) gc() {
// We don't need a lock here because this function is always called with the lock already acquired.
iter := tc.m.Iterator()
for iter.Next() {
key := iter.Key()
ts := iter.Value().(*time.Time)
// Exit if we've found an entry with an unexpired timestamp. Since we're iterating in order of insertion, all
// entries hereafter will be unexpired.
if time.Since(*ts) <= tc.span {
return
}
tc.m.Remove(key)
}
}

func (tc *LastSeenCache) Has(s string) bool {
tc.guard.Lock()
vyzo marked this conversation as resolved.
Show resolved Hide resolved
defer tc.guard.Unlock()

// If the entry exists and has not already expired, slide it forward.
if ts, found := tc.m.Get(s); found {
if t := ts.(*time.Time); time.Since(*t) <= tc.span {
tc.add(s)
return true
}
}
return false
}
Loading