From b74205d265f1b92c9878248a7b02391259c656f6 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Wed, 22 Feb 2023 21:51:59 -0800 Subject: [PATCH] autorelay: refactor relay finder and start autorelay after identify (#2120) * Refactor relay_finder and start autorelay after identify * Clock fork * Remove multiple timers and use a single rate limiting chan for findNodes * Remove clock fork * Rename * Use scheduledWork.nextAllowedCallToPeerSource.Add(rf.conf.minInterval) * Fix flaky test that relied on time --- config/config.go | 4 +- p2p/host/autorelay/autorelay.go | 5 +- p2p/host/autorelay/autorelay_test.go | 106 +++++-------- p2p/host/autorelay/host.go | 4 + p2p/host/autorelay/relay_finder.go | 223 +++++++++++++++++++-------- p2p/host/autorelay/timer.go | 42 ----- 6 files changed, 215 insertions(+), 169 deletions(-) delete mode 100644 p2p/host/autorelay/timer.go diff --git a/config/config.go b/config/config.go index fc16989417..d3824c2a9f 100644 --- a/config/config.go +++ b/config/config.go @@ -433,7 +433,9 @@ func (cfg *Config) NewNode() (host.Host, error) { ho = routed.Wrap(h, router) } if ar != nil { - return autorelay.NewAutoRelayHost(ho, ar), nil + arh := autorelay.NewAutoRelayHost(ho, ar) + arh.Start() + ho = arh } return ho, nil } diff --git a/p2p/host/autorelay/autorelay.go b/p2p/host/autorelay/autorelay.go index 58cebfb431..fc46a2cc53 100644 --- a/p2p/host/autorelay/autorelay.go +++ b/p2p/host/autorelay/autorelay.go @@ -49,12 +49,15 @@ func NewAutoRelay(bhost *basic.BasicHost, opts ...Option) (*AutoRelay, error) { r.relayFinder = newRelayFinder(bhost, conf.peerSource, &conf) bhost.AddrsFactory = r.hostAddrs + return r, nil +} + +func (r *AutoRelay) Start() { r.refCount.Add(1) go func() { defer r.refCount.Done() r.background() }() - return r, nil } func (r *AutoRelay) background() { diff --git a/p2p/host/autorelay/autorelay_test.go b/p2p/host/autorelay/autorelay_test.go index 2a03fd5dc1..b31b465b68 100644 --- a/p2p/host/autorelay/autorelay_test.go +++ b/p2p/host/autorelay/autorelay_test.go @@ -2,14 +2,12 @@ package autorelay_test import ( "context" - "os" "strings" "sync/atomic" "testing" "time" "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -97,7 +95,7 @@ func newRelay(t *testing.T) host.Host { } } return false - }, 500*time.Millisecond, 10*time.Millisecond) + }, time.Second, 10*time.Millisecond) return h } @@ -121,7 +119,7 @@ func TestSingleCandidate(t *testing.T) { ) defer h.Close() - require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond) + require.Eventually(t, func() bool { return numRelays(h) > 0 }, 10*time.Second, 100*time.Millisecond) // test that we don't add any more relays require.Never(t, func() bool { return numRelays(h) > 1 }, 200*time.Millisecond, 50*time.Millisecond) require.Equal(t, 1, counter, "expected the peer source callback to only have been called once") @@ -179,7 +177,7 @@ func TestWaitForCandidates(t *testing.T) { r2 := newRelay(t) t.Cleanup(func() { r2.Close() }) peerChan <- peer.AddrInfo{ID: r2.ID(), Addrs: r2.Addrs()} - require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond) + require.Eventually(t, func() bool { return numRelays(h) > 0 }, 10*time.Second, 100*time.Millisecond) } func TestBackoff(t *testing.T) { @@ -225,15 +223,19 @@ func TestBackoff(t *testing.T) { ) defer h.Close() - require.Eventually(t, func() bool { return reservations.Load() == 1 }, 3*time.Second, 20*time.Millisecond) + require.Eventually(t, func() bool { + return reservations.Load() == 1 + }, 10*time.Second, 20*time.Millisecond, "reservations load should be 1 was %d", reservations.Load()) // make sure we don't add any relays yet for i := 0; i < 2; i++ { cl.Add(backoff / 3) require.Equal(t, 1, int(reservations.Load())) } - cl.Add(backoff / 2) - require.Eventually(t, func() bool { return reservations.Load() == 2 }, 3*time.Second, 20*time.Millisecond) - require.Less(t, int(counter.Load()), 100) // just make sure we're not busy-looping + cl.Add(backoff) + require.Eventually(t, func() bool { + return reservations.Load() == 2 + }, 10*time.Second, 100*time.Millisecond, "reservations load should be 2 was %d", reservations.Load()) + require.Less(t, int(counter.Load()), 300) // just make sure we're not busy-looping require.Equal(t, 2, int(reservations.Load())) } @@ -252,7 +254,7 @@ func TestStaticRelays(t *testing.T) { ) defer h.Close() - require.Eventually(t, func() bool { return numRelays(h) > 0 }, 2*time.Second, 50*time.Millisecond) + require.Eventually(t, func() bool { return numRelays(h) > 0 }, 10*time.Second, 50*time.Millisecond) } func TestConnectOnDisconnect(t *testing.T) { @@ -275,7 +277,7 @@ func TestConnectOnDisconnect(t *testing.T) { ) defer h.Close() - require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond) + require.Eventually(t, func() bool { return numRelays(h) > 0 }, 10*time.Second, 100*time.Millisecond) relaysInUse := usedRelays(h) require.Len(t, relaysInUse, 1) oldRelay := relaysInUse[0] @@ -286,7 +288,7 @@ func TestConnectOnDisconnect(t *testing.T) { } } - require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond) + require.Eventually(t, func() bool { return numRelays(h) > 0 }, 10*time.Second, 100*time.Millisecond) relaysInUse = usedRelays(h) require.Len(t, relaysInUse, 1) require.NotEqualf(t, oldRelay, relaysInUse[0], "old relay should not be used again") @@ -332,28 +334,31 @@ func TestMaxAge(t *testing.T) { ) defer h.Close() - require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond) + require.Eventually(t, func() bool { + return numRelays(h) > 0 + }, 10*time.Second, 100*time.Millisecond) relays := usedRelays(h) require.Len(t, relays, 1) - waitFor := 500 * time.Millisecond - tick := 100 * time.Millisecond - if os.Getenv("CI") != "" { - // Only increase the waitFor since we are increasing the mock clock every tick. - waitFor *= 10 - } require.Eventually(t, func() bool { // we don't know exactly when the timer is reset, just advance our timer multiple times if necessary - cl.Add(time.Second) + cl.Add(30 * time.Second) return len(peerChans) == 0 - }, waitFor, tick) + }, 10*time.Second, 100*time.Millisecond) cl.Add(10 * time.Minute) for _, r := range relays2 { peerChan2 <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()} } cl.Add(11 * time.Minute) + + require.Eventually(t, func() bool { + relays = usedRelays(h) + return len(relays) == 1 + }, 10*time.Second, 100*time.Millisecond) + // by now the 3 relays should have been garbage collected + // And we should only be using a single relay. Lets close it. var oldRelay peer.ID for _, r := range relays1 { if r.ID() == relays[0] { @@ -369,7 +374,7 @@ func TestMaxAge(t *testing.T) { return false } return relays[0] != oldRelay - }, 3*time.Second, 100*time.Millisecond) + }, 10*time.Second, 100*time.Millisecond) require.Len(t, relays, 1) ids := make([]peer.ID, 0, len(relays2)) @@ -379,40 +384,6 @@ func TestMaxAge(t *testing.T) { require.Contains(t, ids, relays[0]) } -func expectDeltaInAddrUpdated(t *testing.T, addrUpdated event.Subscription, expectedDelta int) { - t.Helper() - delta := 0 - for { - select { - case evAny := <-addrUpdated.Out(): - ev := evAny.(event.EvtLocalAddressesUpdated) - for _, updatedAddr := range ev.Removed { - if updatedAddr.Action == event.Removed { - if _, err := updatedAddr.Address.ValueForProtocol(ma.P_CIRCUIT); err == nil { - delta-- - if delta == expectedDelta { - return - } - } - } - } - for _, updatedAddr := range ev.Current { - if updatedAddr.Action == event.Added { - if _, err := updatedAddr.Address.ValueForProtocol(ma.P_CIRCUIT); err == nil { - delta++ - if delta == expectedDelta { - return - } - } - } - } - case <-time.After(10 * time.Second): - t.Fatal("timeout waiting for address updated event") - } - } - -} - func TestReconnectToStaticRelays(t *testing.T) { cl := clock.NewMock() var staticRelays []peer.AddrInfo @@ -428,16 +399,14 @@ func TestReconnectToStaticRelays(t *testing.T) { h := newPrivateNodeWithStaticRelays(t, staticRelays, autorelay.WithClock(cl), + autorelay.WithBackoff(30*time.Minute), ) - defer h.Close() - addrUpdated, err := h.EventBus().Subscribe(new(event.EvtLocalAddressesUpdated)) - require.NoError(t, err) - - expectDeltaInAddrUpdated(t, addrUpdated, 1) - cl.Add(time.Minute) + require.Eventually(t, func() bool { + return numRelays(h) == 1 + }, 10*time.Second, 100*time.Millisecond) relaysInUse := usedRelays(h) oldRelay := relaysInUse[0] @@ -446,12 +415,18 @@ func TestReconnectToStaticRelays(t *testing.T) { r.Network().ClosePeer(h.ID()) } } + require.Eventually(t, func() bool { + return numRelays(h) == 0 + }, 10*time.Second, 100*time.Millisecond) cl.Add(time.Hour) - expectDeltaInAddrUpdated(t, addrUpdated, -1) + require.Eventually(t, func() bool { + return numRelays(h) == 1 + }, 10*time.Second, 100*time.Millisecond) } func TestMinInterval(t *testing.T) { + cl := clock.NewMock() h := newPrivateNode(t, func(context.Context, int) <-chan peer.AddrInfo { peerChan := make(chan peer.AddrInfo, 1) @@ -461,6 +436,7 @@ func TestMinInterval(t *testing.T) { peerChan <- peer.AddrInfo{ID: r1.ID(), Addrs: r1.Addrs()} return peerChan }, + autorelay.WithClock(cl), autorelay.WithMinCandidates(2), autorelay.WithNumRelays(1), autorelay.WithBootDelay(time.Hour), @@ -468,7 +444,9 @@ func TestMinInterval(t *testing.T) { ) defer h.Close() + cl.Add(500 * time.Millisecond) // The second call to peerSource should happen after 1 second require.Never(t, func() bool { return numRelays(h) > 0 }, 500*time.Millisecond, 100*time.Millisecond) - require.Eventually(t, func() bool { return numRelays(h) > 0 }, 1*time.Second, 100*time.Millisecond) + cl.Add(500 * time.Millisecond) + require.Eventually(t, func() bool { return numRelays(h) > 0 }, 10*time.Second, 100*time.Millisecond) } diff --git a/p2p/host/autorelay/host.go b/p2p/host/autorelay/host.go index 740ca23621..c6bd9c5706 100644 --- a/p2p/host/autorelay/host.go +++ b/p2p/host/autorelay/host.go @@ -14,6 +14,10 @@ func (h *AutoRelayHost) Close() error { return h.Host.Close() } +func (h *AutoRelayHost) Start() { + h.ar.Start() +} + func NewAutoRelayHost(h host.Host, ar *AutoRelay) *AutoRelayHost { return &AutoRelayHost{Host: h, ar: ar} } diff --git a/p2p/host/autorelay/relay_finder.go b/p2p/host/autorelay/relay_finder.go index 5c6e903404..36db86b579 100644 --- a/p2p/host/autorelay/relay_finder.go +++ b/p2p/host/autorelay/relay_finder.go @@ -99,11 +99,20 @@ func newRelayFinder(host *basic.BasicHost, peerSource PeerSource, conf *config) } } +type scheduledWorkTimes struct { + leastFrequentInterval time.Duration + nextRefresh time.Time + nextBackoff time.Time + nextOldCandidateCheck time.Time + nextAllowedCallToPeerSource time.Time +} + func (rf *relayFinder) background(ctx context.Context) { + peerSourceRateLimiter := make(chan struct{}, 1) rf.refCount.Add(1) go func() { defer rf.refCount.Done() - rf.findNodes(ctx) + rf.findNodes(ctx, peerSourceRateLimiter) }() rf.refCount.Add(1) @@ -121,17 +130,33 @@ func (rf *relayFinder) background(ctx context.Context) { bootDelayTimer := rf.conf.clock.Timer(rf.conf.bootDelay) defer bootDelayTimer.Stop() - refreshTicker := rf.conf.clock.Ticker(rsvpRefreshInterval) - defer refreshTicker.Stop() - backoffTicker := rf.conf.clock.Ticker(rf.conf.backoff / 5) - defer backoffTicker.Stop() - oldCandidateTicker := rf.conf.clock.Ticker(rf.conf.maxCandidateAge / 5) - defer oldCandidateTicker.Stop() - for { - // when true, we need to identify push - var push bool + // This is the least frequent event. It's our fallback timer if we don't have any other work to do. + leastFrequentInterval := rf.conf.minInterval + if rf.conf.backoff > leastFrequentInterval { + leastFrequentInterval = rf.conf.backoff + } + if rf.conf.maxCandidateAge > leastFrequentInterval { + leastFrequentInterval = rf.conf.maxCandidateAge + } + if rsvpRefreshInterval > leastFrequentInterval { + leastFrequentInterval = rf.conf.maxCandidateAge + } + + now := rf.conf.clock.Now() + + scheduledWork := &scheduledWorkTimes{ + leastFrequentInterval: leastFrequentInterval, + nextRefresh: now.Add(rsvpRefreshInterval), + nextBackoff: now.Add(rf.conf.backoff / 5), + nextOldCandidateCheck: now.Add(rf.conf.maxCandidateAge / 5), + nextAllowedCallToPeerSource: now.Add(-time.Second), // allow immediately + } + + workTimer := rf.conf.clock.Timer(rf.runScheduledWork(ctx, now, scheduledWork, peerSourceRateLimiter).Sub(now)) + defer workTimer.Stop() + for { select { case ev, ok := <-subConnectedness.Out(): if !ok { @@ -141,6 +166,8 @@ func (rf *relayFinder) background(ctx context.Context) { if evt.Connectedness != network.NotConnected { continue } + push := false + rf.relayMx.Lock() if rf.usingRelay(evt.Peer) { // we were disconnected from a relay log.Debugw("disconnected from relay", "id", evt.Peer) @@ -150,85 +177,156 @@ func (rf *relayFinder) background(ctx context.Context) { push = true } rf.relayMx.Unlock() + + if push { + rf.clearCachedAddrsAndSignalAddressChange() + } case <-rf.candidateFound: rf.notifyMaybeConnectToRelay() case <-bootDelayTimer.C: rf.notifyMaybeConnectToRelay() case <-rf.relayUpdated: - push = true - case now := <-refreshTicker.C: - push = rf.refreshReservations(ctx, now) - case now := <-backoffTicker.C: - rf.candidateMx.Lock() - for id, t := range rf.backoff { - if !t.Add(rf.conf.backoff).After(now) { - log.Debugw("removing backoff for node", "id", id) - delete(rf.backoff, id) - } - } - rf.candidateMx.Unlock() - case now := <-oldCandidateTicker.C: - var deleted bool - rf.candidateMx.Lock() - for id, cand := range rf.candidates { - if !cand.added.Add(rf.conf.maxCandidateAge).After(now) { - deleted = true - log.Debugw("deleting candidate due to age", "id", id) - delete(rf.candidates, id) - } - } - rf.candidateMx.Unlock() - if deleted { - rf.notifyMaybeNeedNewCandidates() - } + rf.clearCachedAddrsAndSignalAddressChange() + case <-workTimer.C: + now := rf.conf.clock.Now() + nextTime := rf.runScheduledWork(ctx, now, scheduledWork, peerSourceRateLimiter) + workTimer.Reset(nextTime.Sub(now)) case <-ctx.Done(): return } + } +} - if push { - rf.relayMx.Lock() - rf.cachedAddrs = nil - rf.relayMx.Unlock() - rf.host.SignalAddressChange() +func (rf *relayFinder) clearCachedAddrsAndSignalAddressChange() { + rf.relayMx.Lock() + rf.cachedAddrs = nil + rf.relayMx.Unlock() + rf.host.SignalAddressChange() +} + +func (rf *relayFinder) runScheduledWork(ctx context.Context, now time.Time, scheduledWork *scheduledWorkTimes, peerSourceRateLimiter chan<- struct{}) time.Time { + nextTime := now.Add(scheduledWork.leastFrequentInterval) + + if now.After(scheduledWork.nextRefresh) { + scheduledWork.nextRefresh = now.Add(rsvpRefreshInterval) + if rf.refreshReservations(ctx, now) { + rf.clearCachedAddrsAndSignalAddressChange() + } + } + + if now.After(scheduledWork.nextBackoff) { + scheduledWork.nextBackoff = rf.clearBackoff(now) + } + + if now.After(scheduledWork.nextOldCandidateCheck) { + scheduledWork.nextOldCandidateCheck = rf.clearOldCandidates(now) + } + + if now.After(scheduledWork.nextAllowedCallToPeerSource) { + scheduledWork.nextAllowedCallToPeerSource = scheduledWork.nextAllowedCallToPeerSource.Add(rf.conf.minInterval) + select { + case peerSourceRateLimiter <- struct{}{}: + default: } } + + // Find the next time we need to run scheduled work. + if scheduledWork.nextRefresh.Before(nextTime) { + nextTime = scheduledWork.nextRefresh + } + if scheduledWork.nextBackoff.Before(nextTime) { + nextTime = scheduledWork.nextBackoff + } + if scheduledWork.nextOldCandidateCheck.Before(nextTime) { + nextTime = scheduledWork.nextOldCandidateCheck + } + if scheduledWork.nextAllowedCallToPeerSource.Before(nextTime) { + nextTime = scheduledWork.nextAllowedCallToPeerSource + } + if nextTime == now { + // Only happens in CI with a mock clock + nextTime = nextTime.Add(1) // avoids an infinite loop + } + + return nextTime +} + +// clearOldCandidates clears old candidates from the map. Returns the next time +// to run this function. +func (rf *relayFinder) clearOldCandidates(now time.Time) time.Time { + // If we don't have any candidates, we should run this again in rf.conf.maxCandidateAge. + nextTime := now.Add(rf.conf.maxCandidateAge) + + var deleted bool + rf.candidateMx.Lock() + defer rf.candidateMx.Unlock() + for id, cand := range rf.candidates { + expiry := cand.added.Add(rf.conf.maxCandidateAge) + if expiry.After(now) { + if expiry.Before(nextTime) { + nextTime = expiry + } + } else { + deleted = true + log.Debugw("deleting candidate due to age", "id", id) + delete(rf.candidates, id) + + } + } + if deleted { + rf.notifyMaybeNeedNewCandidates() + } + + return nextTime +} + +// clearBackoff clears old backoff entries from the map. Returns the next time +// to run this function. +func (rf *relayFinder) clearBackoff(now time.Time) time.Time { + nextTime := now.Add(rf.conf.backoff) + + rf.candidateMx.Lock() + defer rf.candidateMx.Unlock() + for id, t := range rf.backoff { + expiry := t.Add(rf.conf.backoff) + if expiry.After(now) { + if expiry.Before(nextTime) { + nextTime = expiry + } + } else { + log.Debugw("removing backoff for node", "id", id) + delete(rf.backoff, id) + } + } + + return nextTime } // findNodes accepts nodes from the channel and tests if they support relaying. // It is run on both public and private nodes. // It garbage collects old entries, so that nodes doesn't overflow. // This makes sure that as soon as we need to find relay candidates, we have them available. -func (rf *relayFinder) findNodes(ctx context.Context) { - peerChan := rf.peerSource(ctx, rf.conf.maxCandidates) +// peerSourceRateLimiter is used to limit how often we call the peer source. +func (rf *relayFinder) findNodes(ctx context.Context, peerSourceRateLimiter <-chan struct{}) { + var peerChan <-chan peer.AddrInfo var wg sync.WaitGroup - lastCallToPeerSource := rf.conf.clock.Now() - - timer := newTimer(rf.conf.clock) for { rf.candidateMx.Lock() numCandidates := len(rf.candidates) rf.candidateMx.Unlock() - if peerChan == nil { - now := rf.conf.clock.Now() - nextAllowedCallToPeerSource := lastCallToPeerSource.Add(rf.conf.minInterval).Sub(now) - if numCandidates < rf.conf.minCandidates { - log.Debugw("not enough candidates. Resetting timer", "num", numCandidates, "desired", rf.conf.minCandidates) - timer.Reset(nextAllowedCallToPeerSource) + if peerChan == nil && numCandidates < rf.conf.minCandidates { + select { + case <-peerSourceRateLimiter: + peerChan = rf.peerSource(ctx, rf.conf.maxCandidates) + case <-ctx.Done(): + return } } select { case <-rf.maybeRequestNewCandidates: continue - case now := <-timer.Chan(): - timer.SetRead() - if peerChan != nil { - // We're still reading peers from the peerChan. No need to query for more peers now. - continue - } - lastCallToPeerSource = now - peerChan = rf.peerSource(ctx, rf.conf.maxCandidates) case pi, ok := <-peerChan: if !ok { wg.Wait() @@ -530,9 +628,12 @@ func (rf *relayFinder) usingRelay(p peer.ID) bool { // selectCandidates returns an ordered slice of relay candidates. // Callers should attempt to obtain reservations with the candidates in this order. func (rf *relayFinder) selectCandidates() []*candidate { + now := rf.conf.clock.Now() candidates := make([]*candidate, 0, len(rf.candidates)) for _, cand := range rf.candidates { - candidates = append(candidates, cand) + if cand.added.Add(rf.conf.maxCandidateAge).After(now) { + candidates = append(candidates, cand) + } } // TODO: better relay selection strategy; this just selects random relays, diff --git a/p2p/host/autorelay/timer.go b/p2p/host/autorelay/timer.go deleted file mode 100644 index b554455302..0000000000 --- a/p2p/host/autorelay/timer.go +++ /dev/null @@ -1,42 +0,0 @@ -package autorelay - -import ( - "time" - - "github.com/benbjohnson/clock" -) - -type timer struct { - timer *clock.Timer - running bool - read bool -} - -func newTimer(cl clock.Clock) *timer { - t := cl.Timer(100 * time.Hour) // There's no way to initialize a stopped timer - t.Stop() - return &timer{timer: t} -} - -func (t *timer) Chan() <-chan time.Time { - return t.timer.C -} - -func (t *timer) Stop() { - if !t.running { - return - } - if !t.timer.Stop() && !t.read { - <-t.timer.C - } - t.read = false -} - -func (t *timer) SetRead() { - t.read = true -} - -func (t *timer) Reset(d time.Duration) { - t.Stop() - t.timer.Reset(d) -}