From 89644b7d3bab921e9423ef60f827239f29872edd Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Tue, 21 Feb 2023 12:49:57 -0800 Subject: [PATCH 1/7] Refactor relay_finder and start autorelay after identify --- config/config.go | 4 +- p2p/host/autorelay/autorelay.go | 5 +- p2p/host/autorelay/autorelay_test.go | 102 +++++++---------- p2p/host/autorelay/host.go | 4 + p2p/host/autorelay/relay_finder.go | 158 +++++++++++++++++++-------- p2p/host/autorelay/timer.go | 42 ------- 6 files changed, 162 insertions(+), 153 deletions(-) delete mode 100644 p2p/host/autorelay/timer.go diff --git a/config/config.go b/config/config.go index fc16989417..d3824c2a9f 100644 --- a/config/config.go +++ b/config/config.go @@ -433,7 +433,9 @@ func (cfg *Config) NewNode() (host.Host, error) { ho = routed.Wrap(h, router) } if ar != nil { - return autorelay.NewAutoRelayHost(ho, ar), nil + arh := autorelay.NewAutoRelayHost(ho, ar) + arh.Start() + ho = arh } return ho, nil } diff --git a/p2p/host/autorelay/autorelay.go b/p2p/host/autorelay/autorelay.go index 58cebfb431..fc46a2cc53 100644 --- a/p2p/host/autorelay/autorelay.go +++ b/p2p/host/autorelay/autorelay.go @@ -49,12 +49,15 @@ func NewAutoRelay(bhost *basic.BasicHost, opts ...Option) (*AutoRelay, error) { r.relayFinder = newRelayFinder(bhost, conf.peerSource, &conf) bhost.AddrsFactory = r.hostAddrs + return r, nil +} + +func (r *AutoRelay) Start() { r.refCount.Add(1) go func() { defer r.refCount.Done() r.background() }() - return r, nil } func (r *AutoRelay) background() { diff --git a/p2p/host/autorelay/autorelay_test.go b/p2p/host/autorelay/autorelay_test.go index 2a03fd5dc1..04c39a1e58 100644 --- a/p2p/host/autorelay/autorelay_test.go +++ b/p2p/host/autorelay/autorelay_test.go @@ -2,14 +2,12 @@ package autorelay_test import ( "context" - "os" "strings" "sync/atomic" "testing" "time" "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -97,7 +95,7 @@ func newRelay(t *testing.T) host.Host { } } return false - }, 500*time.Millisecond, 10*time.Millisecond) + }, time.Second, 10*time.Millisecond) return h } @@ -121,7 +119,7 @@ func TestSingleCandidate(t *testing.T) { ) defer h.Close() - require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond) + require.Eventually(t, func() bool { return numRelays(h) > 0 }, 10*time.Second, 100*time.Millisecond) // test that we don't add any more relays require.Never(t, func() bool { return numRelays(h) > 1 }, 200*time.Millisecond, 50*time.Millisecond) require.Equal(t, 1, counter, "expected the peer source callback to only have been called once") @@ -179,7 +177,7 @@ func TestWaitForCandidates(t *testing.T) { r2 := newRelay(t) t.Cleanup(func() { r2.Close() }) peerChan <- peer.AddrInfo{ID: r2.ID(), Addrs: r2.Addrs()} - require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond) + require.Eventually(t, func() bool { return numRelays(h) > 0 }, 10*time.Second, 100*time.Millisecond) } func TestBackoff(t *testing.T) { @@ -225,15 +223,19 @@ func TestBackoff(t *testing.T) { ) defer h.Close() - require.Eventually(t, func() bool { return reservations.Load() == 1 }, 3*time.Second, 20*time.Millisecond) + require.Eventually(t, func() bool { + return reservations.Load() == 1 + }, 10*time.Second, 20*time.Millisecond, "reservations load should be 1 was %d", reservations.Load()) // make sure we don't add any relays yet for i := 0; i < 2; i++ { cl.Add(backoff / 3) require.Equal(t, 1, int(reservations.Load())) } - cl.Add(backoff / 2) - require.Eventually(t, func() bool { return reservations.Load() == 2 }, 3*time.Second, 20*time.Millisecond) - require.Less(t, int(counter.Load()), 100) // just make sure we're not busy-looping + cl.Add(backoff) + require.Eventually(t, func() bool { + return reservations.Load() == 2 + }, 10*time.Second, 100*time.Millisecond, "reservations load should be 2 was %d", reservations.Load()) + require.Less(t, int(counter.Load()), 300) // just make sure we're not busy-looping require.Equal(t, 2, int(reservations.Load())) } @@ -252,7 +254,7 @@ func TestStaticRelays(t *testing.T) { ) defer h.Close() - require.Eventually(t, func() bool { return numRelays(h) > 0 }, 2*time.Second, 50*time.Millisecond) + require.Eventually(t, func() bool { return numRelays(h) > 0 }, 10*time.Second, 50*time.Millisecond) } func TestConnectOnDisconnect(t *testing.T) { @@ -275,7 +277,7 @@ func TestConnectOnDisconnect(t *testing.T) { ) defer h.Close() - require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond) + require.Eventually(t, func() bool { return numRelays(h) > 0 }, 10*time.Second, 100*time.Millisecond) relaysInUse := usedRelays(h) require.Len(t, relaysInUse, 1) oldRelay := relaysInUse[0] @@ -286,7 +288,7 @@ func TestConnectOnDisconnect(t *testing.T) { } } - require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond) + require.Eventually(t, func() bool { return numRelays(h) > 0 }, 10*time.Second, 100*time.Millisecond) relaysInUse = usedRelays(h) require.Len(t, relaysInUse, 1) require.NotEqualf(t, oldRelay, relaysInUse[0], "old relay should not be used again") @@ -332,28 +334,31 @@ func TestMaxAge(t *testing.T) { ) defer h.Close() - require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond) + require.Eventually(t, func() bool { + return numRelays(h) > 0 + }, 10*time.Second, 100*time.Millisecond) relays := usedRelays(h) require.Len(t, relays, 1) - waitFor := 500 * time.Millisecond - tick := 100 * time.Millisecond - if os.Getenv("CI") != "" { - // Only increase the waitFor since we are increasing the mock clock every tick. - waitFor *= 10 - } require.Eventually(t, func() bool { // we don't know exactly when the timer is reset, just advance our timer multiple times if necessary - cl.Add(time.Second) + cl.Add(30 * time.Second) return len(peerChans) == 0 - }, waitFor, tick) + }, 10*time.Second, 100*time.Millisecond) cl.Add(10 * time.Minute) for _, r := range relays2 { peerChan2 <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()} } cl.Add(11 * time.Minute) + + require.Eventually(t, func() bool { + relays = usedRelays(h) + return len(relays) == 1 + }, 10*time.Second, 100*time.Millisecond) + // by now the 3 relays should have been garbage collected + // And we should only be using a single relay. Lets close it. var oldRelay peer.ID for _, r := range relays1 { if r.ID() == relays[0] { @@ -369,7 +374,7 @@ func TestMaxAge(t *testing.T) { return false } return relays[0] != oldRelay - }, 3*time.Second, 100*time.Millisecond) + }, 10*time.Second, 100*time.Millisecond) require.Len(t, relays, 1) ids := make([]peer.ID, 0, len(relays2)) @@ -379,40 +384,6 @@ func TestMaxAge(t *testing.T) { require.Contains(t, ids, relays[0]) } -func expectDeltaInAddrUpdated(t *testing.T, addrUpdated event.Subscription, expectedDelta int) { - t.Helper() - delta := 0 - for { - select { - case evAny := <-addrUpdated.Out(): - ev := evAny.(event.EvtLocalAddressesUpdated) - for _, updatedAddr := range ev.Removed { - if updatedAddr.Action == event.Removed { - if _, err := updatedAddr.Address.ValueForProtocol(ma.P_CIRCUIT); err == nil { - delta-- - if delta == expectedDelta { - return - } - } - } - } - for _, updatedAddr := range ev.Current { - if updatedAddr.Action == event.Added { - if _, err := updatedAddr.Address.ValueForProtocol(ma.P_CIRCUIT); err == nil { - delta++ - if delta == expectedDelta { - return - } - } - } - } - case <-time.After(10 * time.Second): - t.Fatal("timeout waiting for address updated event") - } - } - -} - func TestReconnectToStaticRelays(t *testing.T) { cl := clock.NewMock() var staticRelays []peer.AddrInfo @@ -428,16 +399,14 @@ func TestReconnectToStaticRelays(t *testing.T) { h := newPrivateNodeWithStaticRelays(t, staticRelays, autorelay.WithClock(cl), + autorelay.WithBackoff(30*time.Minute), ) - defer h.Close() - addrUpdated, err := h.EventBus().Subscribe(new(event.EvtLocalAddressesUpdated)) - require.NoError(t, err) - - expectDeltaInAddrUpdated(t, addrUpdated, 1) - cl.Add(time.Minute) + require.Eventually(t, func() bool { + return numRelays(h) == 1 + }, 10*time.Second, 100*time.Millisecond) relaysInUse := usedRelays(h) oldRelay := relaysInUse[0] @@ -446,9 +415,14 @@ func TestReconnectToStaticRelays(t *testing.T) { r.Network().ClosePeer(h.ID()) } } + require.Eventually(t, func() bool { + return numRelays(h) == 0 + }, 10*time.Second, 100*time.Millisecond) cl.Add(time.Hour) - expectDeltaInAddrUpdated(t, addrUpdated, -1) + require.Eventually(t, func() bool { + return numRelays(h) == 1 + }, 10*time.Second, 100*time.Millisecond) } func TestMinInterval(t *testing.T) { @@ -470,5 +444,5 @@ func TestMinInterval(t *testing.T) { // The second call to peerSource should happen after 1 second require.Never(t, func() bool { return numRelays(h) > 0 }, 500*time.Millisecond, 100*time.Millisecond) - require.Eventually(t, func() bool { return numRelays(h) > 0 }, 1*time.Second, 100*time.Millisecond) + require.Eventually(t, func() bool { return numRelays(h) > 0 }, 10*time.Second, 100*time.Millisecond) } diff --git a/p2p/host/autorelay/host.go b/p2p/host/autorelay/host.go index 740ca23621..c6bd9c5706 100644 --- a/p2p/host/autorelay/host.go +++ b/p2p/host/autorelay/host.go @@ -14,6 +14,10 @@ func (h *AutoRelayHost) Close() error { return h.Host.Close() } +func (h *AutoRelayHost) Start() { + h.ar.Start() +} + func NewAutoRelayHost(h host.Host, ar *AutoRelay) *AutoRelayHost { return &AutoRelayHost{Host: h, ar: ar} } diff --git a/p2p/host/autorelay/relay_finder.go b/p2p/host/autorelay/relay_finder.go index 5c6e903404..2d6ed3b744 100644 --- a/p2p/host/autorelay/relay_finder.go +++ b/p2p/host/autorelay/relay_finder.go @@ -10,6 +10,7 @@ import ( "golang.org/x/sync/errgroup" + "github.com/benbjohnson/clock" "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -99,11 +100,22 @@ func newRelayFinder(host *basic.BasicHost, peerSource PeerSource, conf *config) } } +type scheduledWorkTimes struct { + nextRefresh time.Time + nextBackoff time.Time + nextOldCandidateCheck time.Time + nextAllowedCallToPeerSource time.Time + peerSourceTimer *clock.Timer + needMorePeers bool +} + func (rf *relayFinder) background(ctx context.Context) { + needMorePeersChan := make(chan struct{}, 1) + canCallPeerSourceChan := make(chan struct{}, 1) rf.refCount.Add(1) go func() { defer rf.refCount.Done() - rf.findNodes(ctx) + rf.findNodes(ctx, needMorePeersChan, canCallPeerSourceChan) }() rf.refCount.Add(1) @@ -119,6 +131,8 @@ func (rf *relayFinder) background(ctx context.Context) { } defer subConnectedness.Close() + peerSourceTimer := rf.conf.clock.Timer(rf.conf.minInterval) + defer peerSourceTimer.Stop() bootDelayTimer := rf.conf.clock.Timer(rf.conf.bootDelay) defer bootDelayTimer.Stop() refreshTicker := rf.conf.clock.Ticker(rsvpRefreshInterval) @@ -128,6 +142,15 @@ func (rf *relayFinder) background(ctx context.Context) { oldCandidateTicker := rf.conf.clock.Ticker(rf.conf.maxCandidateAge / 5) defer oldCandidateTicker.Stop() + scheduledWork := &scheduledWorkTimes{ + nextRefresh: rf.conf.clock.Now().Add(rsvpRefreshInterval), + nextBackoff: rf.conf.clock.Now().Add(rf.conf.backoff / 5), + nextOldCandidateCheck: rf.conf.clock.Now().Add(rf.conf.maxCandidateAge / 5), + nextAllowedCallToPeerSource: rf.conf.clock.Now().Add(-time.Second), // allow immediately + peerSourceTimer: peerSourceTimer, + needMorePeers: true, + } + for { // when true, we need to identify push var push bool @@ -156,31 +179,18 @@ func (rf *relayFinder) background(ctx context.Context) { rf.notifyMaybeConnectToRelay() case <-rf.relayUpdated: push = true - case now := <-refreshTicker.C: - push = rf.refreshReservations(ctx, now) - case now := <-backoffTicker.C: - rf.candidateMx.Lock() - for id, t := range rf.backoff { - if !t.Add(rf.conf.backoff).After(now) { - log.Debugw("removing backoff for node", "id", id) - delete(rf.backoff, id) - } - } - rf.candidateMx.Unlock() - case now := <-oldCandidateTicker.C: - var deleted bool - rf.candidateMx.Lock() - for id, cand := range rf.candidates { - if !cand.added.Add(rf.conf.maxCandidateAge).After(now) { - deleted = true - log.Debugw("deleting candidate due to age", "id", id) - delete(rf.candidates, id) - } - } - rf.candidateMx.Unlock() - if deleted { - rf.notifyMaybeNeedNewCandidates() - } + case <-refreshTicker.C: + push = rf.runScheduledWork(ctx, rf.conf.clock.Now(), scheduledWork, canCallPeerSourceChan) + case <-backoffTicker.C: + log.Debugf("backoff ticker fired") + push = rf.runScheduledWork(ctx, rf.conf.clock.Now(), scheduledWork, canCallPeerSourceChan) + case <-oldCandidateTicker.C: + push = rf.runScheduledWork(ctx, rf.conf.clock.Now(), scheduledWork, canCallPeerSourceChan) + case <-peerSourceTimer.C: + push = rf.runScheduledWork(ctx, rf.conf.clock.Now(), scheduledWork, canCallPeerSourceChan) + case <-needMorePeersChan: + scheduledWork.needMorePeers = true + push = rf.runScheduledWork(ctx, rf.conf.clock.Now(), scheduledWork, canCallPeerSourceChan) case <-ctx.Done(): return } @@ -194,41 +204,91 @@ func (rf *relayFinder) background(ctx context.Context) { } } +func (rf *relayFinder) runScheduledWork(ctx context.Context, now time.Time, scheduledWork *scheduledWorkTimes, canCallPeerSourceChan chan<- struct{}) bool { + var push bool + + if now.After(scheduledWork.nextRefresh) { + scheduledWork.nextRefresh = now.Add(rsvpRefreshInterval) + push = rf.refreshReservations(ctx, now) + } + + if now.After(scheduledWork.nextBackoff) { + scheduledWork.nextBackoff = now.Add(rf.conf.backoff / 5) + rf.candidateMx.Lock() + for id, t := range rf.backoff { + if !t.Add(rf.conf.backoff).After(now) { + log.Debugw("removing backoff for node", "id", id) + delete(rf.backoff, id) + } + } + rf.candidateMx.Unlock() + + } + + if now.After(scheduledWork.nextOldCandidateCheck) { + scheduledWork.nextOldCandidateCheck = now.Add(rf.conf.maxCandidateAge / 5) + var deleted bool + rf.candidateMx.Lock() + for id, cand := range rf.candidates { + if !cand.added.Add(rf.conf.maxCandidateAge).After(now) { + deleted = true + log.Debugw("deleting candidate due to age", "id", id) + delete(rf.candidates, id) + } + } + rf.candidateMx.Unlock() + if deleted { + rf.notifyMaybeNeedNewCandidates() + } + } + + if scheduledWork.needMorePeers && now.After(scheduledWork.nextAllowedCallToPeerSource) { + scheduledWork.nextAllowedCallToPeerSource = now.Add(rf.conf.minInterval) + scheduledWork.needMorePeers = false + + if !scheduledWork.peerSourceTimer.Stop() { + // Maybe drain the channel + select { + case <-scheduledWork.peerSourceTimer.C: + default: + } + } + scheduledWork.peerSourceTimer.Reset(scheduledWork.nextAllowedCallToPeerSource.Sub(now)) + + select { + case canCallPeerSourceChan <- struct{}{}: + default: + } + } + + return push +} + // findNodes accepts nodes from the channel and tests if they support relaying. // It is run on both public and private nodes. // It garbage collects old entries, so that nodes doesn't overflow. // This makes sure that as soon as we need to find relay candidates, we have them available. -func (rf *relayFinder) findNodes(ctx context.Context) { - peerChan := rf.peerSource(ctx, rf.conf.maxCandidates) +func (rf *relayFinder) findNodes(ctx context.Context, needMorePeersChan chan<- struct{}, canCallPeerSource <-chan struct{}) { + var peerChan <-chan peer.AddrInfo var wg sync.WaitGroup - lastCallToPeerSource := rf.conf.clock.Now() - - timer := newTimer(rf.conf.clock) for { rf.candidateMx.Lock() numCandidates := len(rf.candidates) rf.candidateMx.Unlock() - if peerChan == nil { - now := rf.conf.clock.Now() - nextAllowedCallToPeerSource := lastCallToPeerSource.Add(rf.conf.minInterval).Sub(now) - if numCandidates < rf.conf.minCandidates { - log.Debugw("not enough candidates. Resetting timer", "num", numCandidates, "desired", rf.conf.minCandidates) - timer.Reset(nextAllowedCallToPeerSource) + if peerChan == nil && numCandidates < rf.conf.minCandidates { + select { + case needMorePeersChan <- struct{}{}: + default: } } select { case <-rf.maybeRequestNewCandidates: continue - case now := <-timer.Chan(): - timer.SetRead() - if peerChan != nil { - // We're still reading peers from the peerChan. No need to query for more peers now. - continue - } - lastCallToPeerSource = now + case <-canCallPeerSource: peerChan = rf.peerSource(ctx, rf.conf.maxCandidates) + continue case pi, ok := <-peerChan: if !ok { wg.Wait() @@ -527,12 +587,20 @@ func (rf *relayFinder) usingRelay(p peer.ID) bool { return ok } +func (rf *relayFinder) candidateOk(c *candidate) bool { + now := rf.conf.clock.Now() + // Check max age here as well in case our background ticker hasn't fired. + return c.added.Add(rf.conf.maxCandidateAge).After(now) +} + // selectCandidates returns an ordered slice of relay candidates. // Callers should attempt to obtain reservations with the candidates in this order. func (rf *relayFinder) selectCandidates() []*candidate { candidates := make([]*candidate, 0, len(rf.candidates)) for _, cand := range rf.candidates { - candidates = append(candidates, cand) + if rf.candidateOk(cand) { + candidates = append(candidates, cand) + } } // TODO: better relay selection strategy; this just selects random relays, diff --git a/p2p/host/autorelay/timer.go b/p2p/host/autorelay/timer.go deleted file mode 100644 index b554455302..0000000000 --- a/p2p/host/autorelay/timer.go +++ /dev/null @@ -1,42 +0,0 @@ -package autorelay - -import ( - "time" - - "github.com/benbjohnson/clock" -) - -type timer struct { - timer *clock.Timer - running bool - read bool -} - -func newTimer(cl clock.Clock) *timer { - t := cl.Timer(100 * time.Hour) // There's no way to initialize a stopped timer - t.Stop() - return &timer{timer: t} -} - -func (t *timer) Chan() <-chan time.Time { - return t.timer.C -} - -func (t *timer) Stop() { - if !t.running { - return - } - if !t.timer.Stop() && !t.read { - <-t.timer.C - } - t.read = false -} - -func (t *timer) SetRead() { - t.read = true -} - -func (t *timer) Reset(d time.Duration) { - t.Stop() - t.timer.Reset(d) -} From 645ae40826cbb0ee0b54aaa2dd29a26be4da29a3 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Tue, 21 Feb 2023 14:05:48 -0800 Subject: [PATCH 2/7] Clock fork --- go.mod | 2 ++ go.sum | 5 ++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 21b8459561..517f1e9850 100644 --- a/go.mod +++ b/go.mod @@ -115,3 +115,5 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/blake3 v1.1.7 // indirect ) + +replace github.com/benbjohnson/clock => github.com/marcopolo/clock v1.4.0 diff --git a/go.sum b/go.sum index b73b9e638c..2be08bd773 100644 --- a/go.sum +++ b/go.sum @@ -51,9 +51,6 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= -github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= -github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= -github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -341,6 +338,8 @@ github.com/libp2p/zeroconf/v2 v2.2.0/go.mod h1:fuJqLnUwZTshS3U/bMRJ3+ow/v9oid1n0 github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= +github.com/marcopolo/clock v1.4.0 h1:kL3wgpJFTB1Zm6SQJkLQMwRV5zPqnPSgz7Lw37H83OI= +github.com/marcopolo/clock v1.4.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd h1:br0buuQ854V8u83wA0rVZ8ttrq5CpaPZdvrK0LP2lOk= github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd/go.mod h1:QuCEs1Nt24+FYQEqAAncTDPJIuGs+LxK1MCiFL25pMU= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= From 0ab01216514274524839257bfcfa1b05e88bee60 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Wed, 22 Feb 2023 17:28:10 -0800 Subject: [PATCH 3/7] Remove multiple timers and use a single rate limiting chan for findNodes --- p2p/host/autorelay/relay_finder.go | 223 +++++++++++++++++------------ 1 file changed, 128 insertions(+), 95 deletions(-) diff --git a/p2p/host/autorelay/relay_finder.go b/p2p/host/autorelay/relay_finder.go index 2d6ed3b744..8400c29efc 100644 --- a/p2p/host/autorelay/relay_finder.go +++ b/p2p/host/autorelay/relay_finder.go @@ -10,7 +10,6 @@ import ( "golang.org/x/sync/errgroup" - "github.com/benbjohnson/clock" "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -101,21 +100,19 @@ func newRelayFinder(host *basic.BasicHost, peerSource PeerSource, conf *config) } type scheduledWorkTimes struct { + leastFrequentInterval time.Duration nextRefresh time.Time nextBackoff time.Time nextOldCandidateCheck time.Time nextAllowedCallToPeerSource time.Time - peerSourceTimer *clock.Timer - needMorePeers bool } func (rf *relayFinder) background(ctx context.Context) { - needMorePeersChan := make(chan struct{}, 1) - canCallPeerSourceChan := make(chan struct{}, 1) + peerSourceRateLimiter := make(chan struct{}, 1) rf.refCount.Add(1) go func() { defer rf.refCount.Done() - rf.findNodes(ctx, needMorePeersChan, canCallPeerSourceChan) + rf.findNodes(ctx, peerSourceRateLimiter) }() rf.refCount.Add(1) @@ -131,30 +128,35 @@ func (rf *relayFinder) background(ctx context.Context) { } defer subConnectedness.Close() - peerSourceTimer := rf.conf.clock.Timer(rf.conf.minInterval) - defer peerSourceTimer.Stop() bootDelayTimer := rf.conf.clock.Timer(rf.conf.bootDelay) defer bootDelayTimer.Stop() - refreshTicker := rf.conf.clock.Ticker(rsvpRefreshInterval) - defer refreshTicker.Stop() - backoffTicker := rf.conf.clock.Ticker(rf.conf.backoff / 5) - defer backoffTicker.Stop() - oldCandidateTicker := rf.conf.clock.Ticker(rf.conf.maxCandidateAge / 5) - defer oldCandidateTicker.Stop() + + // This is the least frequent event. It's our fallback timer if we don't have any other work to do. + leastFrequentInterval := rf.conf.minInterval + if rf.conf.backoff > leastFrequentInterval { + leastFrequentInterval = rf.conf.backoff + } + if rf.conf.maxCandidateAge > leastFrequentInterval { + leastFrequentInterval = rf.conf.maxCandidateAge + } + if rsvpRefreshInterval > leastFrequentInterval { + leastFrequentInterval = rf.conf.maxCandidateAge + } + + now := rf.conf.clock.Now() scheduledWork := &scheduledWorkTimes{ - nextRefresh: rf.conf.clock.Now().Add(rsvpRefreshInterval), - nextBackoff: rf.conf.clock.Now().Add(rf.conf.backoff / 5), - nextOldCandidateCheck: rf.conf.clock.Now().Add(rf.conf.maxCandidateAge / 5), - nextAllowedCallToPeerSource: rf.conf.clock.Now().Add(-time.Second), // allow immediately - peerSourceTimer: peerSourceTimer, - needMorePeers: true, + leastFrequentInterval: leastFrequentInterval, + nextRefresh: now.Add(rsvpRefreshInterval), + nextBackoff: now.Add(rf.conf.backoff / 5), + nextOldCandidateCheck: now.Add(rf.conf.maxCandidateAge / 5), + nextAllowedCallToPeerSource: now.Add(-time.Second), // allow immediately } - for { - // when true, we need to identify push - var push bool + workTimer := rf.conf.clock.Timer(rf.runScheduledWork(ctx, now, scheduledWork, peerSourceRateLimiter).Sub(now)) + defer workTimer.Stop() + for { select { case ev, ok := <-subConnectedness.Out(): if !ok { @@ -164,6 +166,8 @@ func (rf *relayFinder) background(ctx context.Context) { if evt.Connectedness != network.NotConnected { continue } + push := false + rf.relayMx.Lock() if rf.usingRelay(evt.Peer) { // we were disconnected from a relay log.Debugw("disconnected from relay", "id", evt.Peer) @@ -173,102 +177,137 @@ func (rf *relayFinder) background(ctx context.Context) { push = true } rf.relayMx.Unlock() + + if push { + rf.clearCachedAddrsAndIdentifyPush() + } case <-rf.candidateFound: rf.notifyMaybeConnectToRelay() case <-bootDelayTimer.C: rf.notifyMaybeConnectToRelay() case <-rf.relayUpdated: - push = true - case <-refreshTicker.C: - push = rf.runScheduledWork(ctx, rf.conf.clock.Now(), scheduledWork, canCallPeerSourceChan) - case <-backoffTicker.C: - log.Debugf("backoff ticker fired") - push = rf.runScheduledWork(ctx, rf.conf.clock.Now(), scheduledWork, canCallPeerSourceChan) - case <-oldCandidateTicker.C: - push = rf.runScheduledWork(ctx, rf.conf.clock.Now(), scheduledWork, canCallPeerSourceChan) - case <-peerSourceTimer.C: - push = rf.runScheduledWork(ctx, rf.conf.clock.Now(), scheduledWork, canCallPeerSourceChan) - case <-needMorePeersChan: - scheduledWork.needMorePeers = true - push = rf.runScheduledWork(ctx, rf.conf.clock.Now(), scheduledWork, canCallPeerSourceChan) + rf.clearCachedAddrsAndIdentifyPush() + case <-workTimer.C: + now := rf.conf.clock.Now() + nextTime := rf.runScheduledWork(ctx, now, scheduledWork, peerSourceRateLimiter) + workTimer.Reset(nextTime.Sub(now)) case <-ctx.Done(): return } - - if push { - rf.relayMx.Lock() - rf.cachedAddrs = nil - rf.relayMx.Unlock() - rf.host.SignalAddressChange() - } } } -func (rf *relayFinder) runScheduledWork(ctx context.Context, now time.Time, scheduledWork *scheduledWorkTimes, canCallPeerSourceChan chan<- struct{}) bool { - var push bool +func (rf *relayFinder) clearCachedAddrsAndIdentifyPush() { + rf.relayMx.Lock() + rf.cachedAddrs = nil + rf.relayMx.Unlock() + rf.host.SignalAddressChange() +} + +func (rf *relayFinder) runScheduledWork(ctx context.Context, now time.Time, scheduledWork *scheduledWorkTimes, peerSourceRateLimiter chan<- struct{}) time.Time { + nextTime := now.Add(scheduledWork.leastFrequentInterval) if now.After(scheduledWork.nextRefresh) { scheduledWork.nextRefresh = now.Add(rsvpRefreshInterval) - push = rf.refreshReservations(ctx, now) + if rf.refreshReservations(ctx, now) { + rf.clearCachedAddrsAndIdentifyPush() + } } if now.After(scheduledWork.nextBackoff) { - scheduledWork.nextBackoff = now.Add(rf.conf.backoff / 5) - rf.candidateMx.Lock() - for id, t := range rf.backoff { - if !t.Add(rf.conf.backoff).After(now) { - log.Debugw("removing backoff for node", "id", id) - delete(rf.backoff, id) - } - } - rf.candidateMx.Unlock() - + scheduledWork.nextBackoff = rf.clearBackoff(now) } if now.After(scheduledWork.nextOldCandidateCheck) { - scheduledWork.nextOldCandidateCheck = now.Add(rf.conf.maxCandidateAge / 5) - var deleted bool - rf.candidateMx.Lock() - for id, cand := range rf.candidates { - if !cand.added.Add(rf.conf.maxCandidateAge).After(now) { - deleted = true - log.Debugw("deleting candidate due to age", "id", id) - delete(rf.candidates, id) - } - } - rf.candidateMx.Unlock() - if deleted { - rf.notifyMaybeNeedNewCandidates() - } + scheduledWork.nextOldCandidateCheck = rf.clearOldCandidates(now) } - if scheduledWork.needMorePeers && now.After(scheduledWork.nextAllowedCallToPeerSource) { + if now.After(scheduledWork.nextAllowedCallToPeerSource) { scheduledWork.nextAllowedCallToPeerSource = now.Add(rf.conf.minInterval) - scheduledWork.needMorePeers = false + select { + case peerSourceRateLimiter <- struct{}{}: + default: + } + } - if !scheduledWork.peerSourceTimer.Stop() { - // Maybe drain the channel - select { - case <-scheduledWork.peerSourceTimer.C: - default: + // Find the next time we need to run scheduled work. + if scheduledWork.nextRefresh.Before(nextTime) { + nextTime = scheduledWork.nextRefresh + } + if scheduledWork.nextBackoff.Before(nextTime) { + nextTime = scheduledWork.nextBackoff + } + if scheduledWork.nextOldCandidateCheck.Before(nextTime) { + nextTime = scheduledWork.nextOldCandidateCheck + } + if scheduledWork.nextAllowedCallToPeerSource.Before(nextTime) { + nextTime = scheduledWork.nextAllowedCallToPeerSource + } + if nextTime == now { + // Only happens in CI with a mock clock + nextTime = nextTime.Add(1) // avoids an infinite loop + } + + return nextTime +} + +// clearOldCandidates clears old candidates from the map. Returns the next time +// to run this function. +func (rf *relayFinder) clearOldCandidates(now time.Time) time.Time { + // If we don't have any candidates, we should run this again in rf.conf.maxCandidateAge. + nextTime := now.Add(rf.conf.maxCandidateAge) + + var deleted bool + rf.candidateMx.Lock() + defer rf.candidateMx.Unlock() + for id, cand := range rf.candidates { + expiry := cand.added.Add(rf.conf.maxCandidateAge) + if expiry.After(now) { + if expiry.Before(nextTime) { + nextTime = expiry } + } else { + deleted = true + log.Debugw("deleting candidate due to age", "id", id) + delete(rf.candidates, id) + } - scheduledWork.peerSourceTimer.Reset(scheduledWork.nextAllowedCallToPeerSource.Sub(now)) + } + if deleted { + rf.notifyMaybeNeedNewCandidates() + } - select { - case canCallPeerSourceChan <- struct{}{}: - default: + return nextTime +} + +// clearBackoff clears old backoff entries from the map. Returns the next time +// to run this function. +func (rf *relayFinder) clearBackoff(now time.Time) time.Time { + nextTime := now.Add(rf.conf.backoff) + + rf.candidateMx.Lock() + defer rf.candidateMx.Unlock() + for id, t := range rf.backoff { + expiry := t.Add(rf.conf.backoff) + if expiry.After(now) { + if expiry.Before(nextTime) { + nextTime = expiry + } + } else { + log.Debugw("removing backoff for node", "id", id) + delete(rf.backoff, id) } } - return push + return nextTime } // findNodes accepts nodes from the channel and tests if they support relaying. // It is run on both public and private nodes. // It garbage collects old entries, so that nodes doesn't overflow. // This makes sure that as soon as we need to find relay candidates, we have them available. -func (rf *relayFinder) findNodes(ctx context.Context, needMorePeersChan chan<- struct{}, canCallPeerSource <-chan struct{}) { +// peerSourceRateLimiter is used to limit how often we call the peer source. +func (rf *relayFinder) findNodes(ctx context.Context, peerSourceRateLimiter <-chan struct{}) { var peerChan <-chan peer.AddrInfo var wg sync.WaitGroup for { @@ -278,17 +317,16 @@ func (rf *relayFinder) findNodes(ctx context.Context, needMorePeersChan chan<- s if peerChan == nil && numCandidates < rf.conf.minCandidates { select { - case needMorePeersChan <- struct{}{}: - default: + case <-peerSourceRateLimiter: + peerChan = rf.peerSource(ctx, rf.conf.maxCandidates) + case <-ctx.Done(): + return } } select { case <-rf.maybeRequestNewCandidates: continue - case <-canCallPeerSource: - peerChan = rf.peerSource(ctx, rf.conf.maxCandidates) - continue case pi, ok := <-peerChan: if !ok { wg.Wait() @@ -587,18 +625,13 @@ func (rf *relayFinder) usingRelay(p peer.ID) bool { return ok } -func (rf *relayFinder) candidateOk(c *candidate) bool { - now := rf.conf.clock.Now() - // Check max age here as well in case our background ticker hasn't fired. - return c.added.Add(rf.conf.maxCandidateAge).After(now) -} - // selectCandidates returns an ordered slice of relay candidates. // Callers should attempt to obtain reservations with the candidates in this order. func (rf *relayFinder) selectCandidates() []*candidate { + now := rf.conf.clock.Now() candidates := make([]*candidate, 0, len(rf.candidates)) for _, cand := range rf.candidates { - if rf.candidateOk(cand) { + if cand.added.Add(rf.conf.maxCandidateAge).After(now) { candidates = append(candidates, cand) } } From 8e75221455956dfc7c9d26a36e8caea56ffde8f9 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Wed, 22 Feb 2023 17:31:23 -0800 Subject: [PATCH 4/7] Remove clock fork --- go.mod | 2 -- go.sum | 5 +++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 517f1e9850..21b8459561 100644 --- a/go.mod +++ b/go.mod @@ -115,5 +115,3 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/blake3 v1.1.7 // indirect ) - -replace github.com/benbjohnson/clock => github.com/marcopolo/clock v1.4.0 diff --git a/go.sum b/go.sum index 2be08bd773..b73b9e638c 100644 --- a/go.sum +++ b/go.sum @@ -51,6 +51,9 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= +github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= +github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -338,8 +341,6 @@ github.com/libp2p/zeroconf/v2 v2.2.0/go.mod h1:fuJqLnUwZTshS3U/bMRJ3+ow/v9oid1n0 github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= -github.com/marcopolo/clock v1.4.0 h1:kL3wgpJFTB1Zm6SQJkLQMwRV5zPqnPSgz7Lw37H83OI= -github.com/marcopolo/clock v1.4.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd h1:br0buuQ854V8u83wA0rVZ8ttrq5CpaPZdvrK0LP2lOk= github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd/go.mod h1:QuCEs1Nt24+FYQEqAAncTDPJIuGs+LxK1MCiFL25pMU= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= From cacdcb69e04e71926eddefbf7dba9075f14ac7cf Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Wed, 22 Feb 2023 18:48:26 -0800 Subject: [PATCH 5/7] Rename --- p2p/host/autorelay/relay_finder.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/p2p/host/autorelay/relay_finder.go b/p2p/host/autorelay/relay_finder.go index 8400c29efc..d0b6544d7a 100644 --- a/p2p/host/autorelay/relay_finder.go +++ b/p2p/host/autorelay/relay_finder.go @@ -179,14 +179,14 @@ func (rf *relayFinder) background(ctx context.Context) { rf.relayMx.Unlock() if push { - rf.clearCachedAddrsAndIdentifyPush() + rf.clearCachedAddrsAndSignalAddressChange() } case <-rf.candidateFound: rf.notifyMaybeConnectToRelay() case <-bootDelayTimer.C: rf.notifyMaybeConnectToRelay() case <-rf.relayUpdated: - rf.clearCachedAddrsAndIdentifyPush() + rf.clearCachedAddrsAndSignalAddressChange() case <-workTimer.C: now := rf.conf.clock.Now() nextTime := rf.runScheduledWork(ctx, now, scheduledWork, peerSourceRateLimiter) @@ -197,7 +197,7 @@ func (rf *relayFinder) background(ctx context.Context) { } } -func (rf *relayFinder) clearCachedAddrsAndIdentifyPush() { +func (rf *relayFinder) clearCachedAddrsAndSignalAddressChange() { rf.relayMx.Lock() rf.cachedAddrs = nil rf.relayMx.Unlock() @@ -210,7 +210,7 @@ func (rf *relayFinder) runScheduledWork(ctx context.Context, now time.Time, sche if now.After(scheduledWork.nextRefresh) { scheduledWork.nextRefresh = now.Add(rsvpRefreshInterval) if rf.refreshReservations(ctx, now) { - rf.clearCachedAddrsAndIdentifyPush() + rf.clearCachedAddrsAndSignalAddressChange() } } From ad305281616b1c93f7e1d7e6085bbb3a110d67cb Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Wed, 22 Feb 2023 18:51:56 -0800 Subject: [PATCH 6/7] Use scheduledWork.nextAllowedCallToPeerSource.Add(rf.conf.minInterval) --- p2p/host/autorelay/relay_finder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/host/autorelay/relay_finder.go b/p2p/host/autorelay/relay_finder.go index d0b6544d7a..36db86b579 100644 --- a/p2p/host/autorelay/relay_finder.go +++ b/p2p/host/autorelay/relay_finder.go @@ -223,7 +223,7 @@ func (rf *relayFinder) runScheduledWork(ctx context.Context, now time.Time, sche } if now.After(scheduledWork.nextAllowedCallToPeerSource) { - scheduledWork.nextAllowedCallToPeerSource = now.Add(rf.conf.minInterval) + scheduledWork.nextAllowedCallToPeerSource = scheduledWork.nextAllowedCallToPeerSource.Add(rf.conf.minInterval) select { case peerSourceRateLimiter <- struct{}{}: default: From 7f379d7b78b3d559fc2a19e42923c88312c9c2bb Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Wed, 22 Feb 2023 21:18:49 -0800 Subject: [PATCH 7/7] Fix flaky test that relied on time --- p2p/host/autorelay/autorelay_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/p2p/host/autorelay/autorelay_test.go b/p2p/host/autorelay/autorelay_test.go index 04c39a1e58..b31b465b68 100644 --- a/p2p/host/autorelay/autorelay_test.go +++ b/p2p/host/autorelay/autorelay_test.go @@ -426,6 +426,7 @@ func TestReconnectToStaticRelays(t *testing.T) { } func TestMinInterval(t *testing.T) { + cl := clock.NewMock() h := newPrivateNode(t, func(context.Context, int) <-chan peer.AddrInfo { peerChan := make(chan peer.AddrInfo, 1) @@ -435,6 +436,7 @@ func TestMinInterval(t *testing.T) { peerChan <- peer.AddrInfo{ID: r1.ID(), Addrs: r1.Addrs()} return peerChan }, + autorelay.WithClock(cl), autorelay.WithMinCandidates(2), autorelay.WithNumRelays(1), autorelay.WithBootDelay(time.Hour), @@ -442,7 +444,9 @@ func TestMinInterval(t *testing.T) { ) defer h.Close() + cl.Add(500 * time.Millisecond) // The second call to peerSource should happen after 1 second require.Never(t, func() bool { return numRelays(h) > 0 }, 500*time.Millisecond, 100*time.Millisecond) + cl.Add(500 * time.Millisecond) require.Eventually(t, func() bool { return numRelays(h) > 0 }, 10*time.Second, 100*time.Millisecond) }