Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

autorelay: Refactor relay_finder and start autorelay after identify #2120

Merged
merged 7 commits into from
Feb 23, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 128 additions & 95 deletions p2p/host/autorelay/relay_finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"golang.org/x/sync/errgroup"

"github.com/benbjohnson/clock"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -101,21 +100,19 @@ func newRelayFinder(host *basic.BasicHost, peerSource PeerSource, conf *config)
}

type scheduledWorkTimes struct {
leastFrequentInterval time.Duration
nextRefresh time.Time
nextBackoff time.Time
nextOldCandidateCheck time.Time
nextAllowedCallToPeerSource time.Time
peerSourceTimer *clock.Timer
needMorePeers bool
}

func (rf *relayFinder) background(ctx context.Context) {
needMorePeersChan := make(chan struct{}, 1)
canCallPeerSourceChan := make(chan struct{}, 1)
peerSourceRateLimiter := make(chan struct{}, 1)
rf.refCount.Add(1)
go func() {
defer rf.refCount.Done()
rf.findNodes(ctx, needMorePeersChan, canCallPeerSourceChan)
rf.findNodes(ctx, peerSourceRateLimiter)
}()

rf.refCount.Add(1)
Expand All @@ -131,30 +128,35 @@ func (rf *relayFinder) background(ctx context.Context) {
}
defer subConnectedness.Close()

peerSourceTimer := rf.conf.clock.Timer(rf.conf.minInterval)
defer peerSourceTimer.Stop()
bootDelayTimer := rf.conf.clock.Timer(rf.conf.bootDelay)
defer bootDelayTimer.Stop()
refreshTicker := rf.conf.clock.Ticker(rsvpRefreshInterval)
defer refreshTicker.Stop()
backoffTicker := rf.conf.clock.Ticker(rf.conf.backoff / 5)
defer backoffTicker.Stop()
oldCandidateTicker := rf.conf.clock.Ticker(rf.conf.maxCandidateAge / 5)
defer oldCandidateTicker.Stop()

// This is the least frequent event. It's our fallback timer if we don't have any other work to do.
leastFrequentInterval := rf.conf.minInterval
if rf.conf.backoff > leastFrequentInterval {
leastFrequentInterval = rf.conf.backoff
}
if rf.conf.maxCandidateAge > leastFrequentInterval {
leastFrequentInterval = rf.conf.maxCandidateAge
}
if rsvpRefreshInterval > leastFrequentInterval {
leastFrequentInterval = rf.conf.maxCandidateAge
}

now := rf.conf.clock.Now()

scheduledWork := &scheduledWorkTimes{
nextRefresh: rf.conf.clock.Now().Add(rsvpRefreshInterval),
nextBackoff: rf.conf.clock.Now().Add(rf.conf.backoff / 5),
nextOldCandidateCheck: rf.conf.clock.Now().Add(rf.conf.maxCandidateAge / 5),
nextAllowedCallToPeerSource: rf.conf.clock.Now().Add(-time.Second), // allow immediately
peerSourceTimer: peerSourceTimer,
needMorePeers: true,
leastFrequentInterval: leastFrequentInterval,
nextRefresh: now.Add(rsvpRefreshInterval),
nextBackoff: now.Add(rf.conf.backoff / 5),
nextOldCandidateCheck: now.Add(rf.conf.maxCandidateAge / 5),
nextAllowedCallToPeerSource: now.Add(-time.Second), // allow immediately
}

for {
// when true, we need to identify push
var push bool
workTimer := rf.conf.clock.Timer(rf.runScheduledWork(ctx, now, scheduledWork, peerSourceRateLimiter).Sub(now))
defer workTimer.Stop()

for {
select {
case ev, ok := <-subConnectedness.Out():
if !ok {
Expand All @@ -164,6 +166,8 @@ func (rf *relayFinder) background(ctx context.Context) {
if evt.Connectedness != network.NotConnected {
continue
}
push := false

rf.relayMx.Lock()
if rf.usingRelay(evt.Peer) { // we were disconnected from a relay
log.Debugw("disconnected from relay", "id", evt.Peer)
Expand All @@ -173,102 +177,137 @@ func (rf *relayFinder) background(ctx context.Context) {
push = true
}
rf.relayMx.Unlock()

if push {
rf.clearCachedAddrsAndIdentifyPush()
}
case <-rf.candidateFound:
rf.notifyMaybeConnectToRelay()
case <-bootDelayTimer.C:
rf.notifyMaybeConnectToRelay()
case <-rf.relayUpdated:
push = true
case <-refreshTicker.C:
push = rf.runScheduledWork(ctx, rf.conf.clock.Now(), scheduledWork, canCallPeerSourceChan)
case <-backoffTicker.C:
log.Debugf("backoff ticker fired")
push = rf.runScheduledWork(ctx, rf.conf.clock.Now(), scheduledWork, canCallPeerSourceChan)
case <-oldCandidateTicker.C:
push = rf.runScheduledWork(ctx, rf.conf.clock.Now(), scheduledWork, canCallPeerSourceChan)
case <-peerSourceTimer.C:
push = rf.runScheduledWork(ctx, rf.conf.clock.Now(), scheduledWork, canCallPeerSourceChan)
case <-needMorePeersChan:
scheduledWork.needMorePeers = true
push = rf.runScheduledWork(ctx, rf.conf.clock.Now(), scheduledWork, canCallPeerSourceChan)
rf.clearCachedAddrsAndIdentifyPush()
case <-workTimer.C:
now := rf.conf.clock.Now()
nextTime := rf.runScheduledWork(ctx, now, scheduledWork, peerSourceRateLimiter)
workTimer.Reset(nextTime.Sub(now))
case <-ctx.Done():
return
}

if push {
rf.relayMx.Lock()
rf.cachedAddrs = nil
rf.relayMx.Unlock()
rf.host.SignalAddressChange()
}
}
}

func (rf *relayFinder) runScheduledWork(ctx context.Context, now time.Time, scheduledWork *scheduledWorkTimes, canCallPeerSourceChan chan<- struct{}) bool {
var push bool
func (rf *relayFinder) clearCachedAddrsAndIdentifyPush() {
MarcoPolo marked this conversation as resolved.
Show resolved Hide resolved
rf.relayMx.Lock()
rf.cachedAddrs = nil
rf.relayMx.Unlock()
rf.host.SignalAddressChange()
}

func (rf *relayFinder) runScheduledWork(ctx context.Context, now time.Time, scheduledWork *scheduledWorkTimes, peerSourceRateLimiter chan<- struct{}) time.Time {
nextTime := now.Add(scheduledWork.leastFrequentInterval)

if now.After(scheduledWork.nextRefresh) {
scheduledWork.nextRefresh = now.Add(rsvpRefreshInterval)
push = rf.refreshReservations(ctx, now)
if rf.refreshReservations(ctx, now) {
rf.clearCachedAddrsAndIdentifyPush()
}
}

if now.After(scheduledWork.nextBackoff) {
scheduledWork.nextBackoff = now.Add(rf.conf.backoff / 5)
rf.candidateMx.Lock()
for id, t := range rf.backoff {
if !t.Add(rf.conf.backoff).After(now) {
log.Debugw("removing backoff for node", "id", id)
delete(rf.backoff, id)
}
}
rf.candidateMx.Unlock()

scheduledWork.nextBackoff = rf.clearBackoff(now)
}

if now.After(scheduledWork.nextOldCandidateCheck) {
scheduledWork.nextOldCandidateCheck = now.Add(rf.conf.maxCandidateAge / 5)
var deleted bool
rf.candidateMx.Lock()
for id, cand := range rf.candidates {
if !cand.added.Add(rf.conf.maxCandidateAge).After(now) {
deleted = true
log.Debugw("deleting candidate due to age", "id", id)
delete(rf.candidates, id)
}
}
rf.candidateMx.Unlock()
if deleted {
rf.notifyMaybeNeedNewCandidates()
}
scheduledWork.nextOldCandidateCheck = rf.clearOldCandidates(now)
}

if scheduledWork.needMorePeers && now.After(scheduledWork.nextAllowedCallToPeerSource) {
if now.After(scheduledWork.nextAllowedCallToPeerSource) {
scheduledWork.nextAllowedCallToPeerSource = now.Add(rf.conf.minInterval)
MarcoPolo marked this conversation as resolved.
Show resolved Hide resolved
scheduledWork.needMorePeers = false
select {
case peerSourceRateLimiter <- struct{}{}:
default:
}
}

if !scheduledWork.peerSourceTimer.Stop() {
// Maybe drain the channel
select {
case <-scheduledWork.peerSourceTimer.C:
default:
// Find the next time we need to run scheduled work.
if scheduledWork.nextRefresh.Before(nextTime) {
nextTime = scheduledWork.nextRefresh
}
if scheduledWork.nextBackoff.Before(nextTime) {
nextTime = scheduledWork.nextBackoff
}
if scheduledWork.nextOldCandidateCheck.Before(nextTime) {
nextTime = scheduledWork.nextOldCandidateCheck
}
if scheduledWork.nextAllowedCallToPeerSource.Before(nextTime) {
nextTime = scheduledWork.nextAllowedCallToPeerSource
}
if nextTime == now {
// Only happens in CI with a mock clock
nextTime = nextTime.Add(1) // avoids an infinite loop
}

return nextTime
}

// clearOldCandidates clears old candidates from the map. Returns the next time
// to run this function.
func (rf *relayFinder) clearOldCandidates(now time.Time) time.Time {
// If we don't have any candidates, we should run this again in rf.conf.maxCandidateAge.
nextTime := now.Add(rf.conf.maxCandidateAge)

var deleted bool
rf.candidateMx.Lock()
defer rf.candidateMx.Unlock()
for id, cand := range rf.candidates {
expiry := cand.added.Add(rf.conf.maxCandidateAge)
if expiry.After(now) {
if expiry.Before(nextTime) {
nextTime = expiry
}
} else {
deleted = true
log.Debugw("deleting candidate due to age", "id", id)
delete(rf.candidates, id)

}
scheduledWork.peerSourceTimer.Reset(scheduledWork.nextAllowedCallToPeerSource.Sub(now))
}
if deleted {
rf.notifyMaybeNeedNewCandidates()
}

select {
case canCallPeerSourceChan <- struct{}{}:
default:
return nextTime
}

// clearBackoff clears old backoff entries from the map. Returns the next time
// to run this function.
func (rf *relayFinder) clearBackoff(now time.Time) time.Time {
nextTime := now.Add(rf.conf.backoff)

rf.candidateMx.Lock()
defer rf.candidateMx.Unlock()
for id, t := range rf.backoff {
expiry := t.Add(rf.conf.backoff)
if expiry.After(now) {
if expiry.Before(nextTime) {
nextTime = expiry
}
} else {
log.Debugw("removing backoff for node", "id", id)
delete(rf.backoff, id)
}
}

return push
return nextTime
}

// findNodes accepts nodes from the channel and tests if they support relaying.
// It is run on both public and private nodes.
// It garbage collects old entries, so that nodes doesn't overflow.
// This makes sure that as soon as we need to find relay candidates, we have them available.
func (rf *relayFinder) findNodes(ctx context.Context, needMorePeersChan chan<- struct{}, canCallPeerSource <-chan struct{}) {
// peerSourceRateLimiter is used to limit how often we call the peer source.
func (rf *relayFinder) findNodes(ctx context.Context, peerSourceRateLimiter <-chan struct{}) {
var peerChan <-chan peer.AddrInfo
var wg sync.WaitGroup
for {
Expand All @@ -278,17 +317,16 @@ func (rf *relayFinder) findNodes(ctx context.Context, needMorePeersChan chan<- s

if peerChan == nil && numCandidates < rf.conf.minCandidates {
select {
case needMorePeersChan <- struct{}{}:
default:
case <-peerSourceRateLimiter:
peerChan = rf.peerSource(ctx, rf.conf.maxCandidates)
case <-ctx.Done():
return
}
}

select {
case <-rf.maybeRequestNewCandidates:
continue
case <-canCallPeerSource:
peerChan = rf.peerSource(ctx, rf.conf.maxCandidates)
continue
case pi, ok := <-peerChan:
if !ok {
wg.Wait()
Expand Down Expand Up @@ -587,18 +625,13 @@ func (rf *relayFinder) usingRelay(p peer.ID) bool {
return ok
}

func (rf *relayFinder) candidateOk(c *candidate) bool {
now := rf.conf.clock.Now()
// Check max age here as well in case our background ticker hasn't fired.
return c.added.Add(rf.conf.maxCandidateAge).After(now)
}

// selectCandidates returns an ordered slice of relay candidates.
// Callers should attempt to obtain reservations with the candidates in this order.
func (rf *relayFinder) selectCandidates() []*candidate {
now := rf.conf.clock.Now()
candidates := make([]*candidate, 0, len(rf.candidates))
for _, cand := range rf.candidates {
if rf.candidateOk(cand) {
if cand.added.Add(rf.conf.maxCandidateAge).After(now) {
candidates = append(candidates, cand)
}
}
Expand Down