Skip to content

Commit

Permalink
correctly handle static relays
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Mar 31, 2022
1 parent 29fd102 commit 7e767cb
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 33 deletions.
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ type Config struct {
Routing RoutingC

EnableAutoRelay bool
AutoRelayOpts []autorelay.Option
AutoNATConfig
AutoRelayOpts []autorelay.Option

EnableHolePunching bool
HolePunchingOptions []holepunch.Option
Expand Down
28 changes: 24 additions & 4 deletions p2p/host/autorelay/autorelay.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ type AutoRelay struct {
ctx context.Context
ctxCancel context.CancelFunc

conf *config

mx sync.Mutex
status network.Reachability

relayFinder *relayFinder

peerChanIn <-chan peer.AddrInfo // capacity 0
peerChanOut chan peer.AddrInfo // capacity 20
peerChanOut chan peer.AddrInfo // capacity 20

host host.Host
addrsF basic.AddrsFactory
Expand All @@ -47,8 +48,8 @@ func NewAutoRelay(bhost *basic.BasicHost, opts ...Option) (*AutoRelay, error) {
return nil, err
}
}
r.peerChanIn = conf.peerChan
r.peerChanOut = make(chan peer.AddrInfo, conf.maxCandidates)
r.conf = &conf
r.relayFinder = newRelayFinder(bhost, r.peerChanOut, &conf)
bhost.AddrsFactory = r.hostAddrs

Expand All @@ -68,6 +69,25 @@ func (r *AutoRelay) background() {
}
defer subReachability.Close()

var peerChan <-chan peer.AddrInfo
if len(r.conf.staticRelays) == 0 {
peerChan = r.conf.peerChan
} else {
pc := make(chan peer.AddrInfo)
peerChan = pc
r.refCount.Add(1)
go func() {
defer r.refCount.Done()
for _, sr := range r.conf.staticRelays {
select {
case pc <- sr:
case <-r.ctx.Done():
return
}
}
}()
}

for {
select {
case <-r.ctx.Done():
Expand All @@ -89,7 +109,7 @@ func (r *AutoRelay) background() {
r.mx.Lock()
r.status = evt.Reachability
r.mx.Unlock()
case pi := <-r.peerChanIn:
case pi := <-peerChan:
select {
case r.peerChanOut <- pi: // if there's space in the channel, great
default:
Expand Down
27 changes: 23 additions & 4 deletions p2p/host/autorelay/autorelay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ func newBrokenRelay(t *testing.T, workAfter int) host.Host {
require.NoError(t, err)
var n int32
h.SetStreamHandler(circuitv2_proto.ProtoIDv2Hop, func(str network.Stream) {
t.Log("rejecting reservation")
str.Reset()
num := atomic.AddInt32(&n, 1)
if int(num) >= workAfter {
Expand Down Expand Up @@ -189,12 +188,32 @@ func TestMaxBackoffs(t *testing.T) {
)
defer h.Close()

r1 := newBrokenRelay(t, 4)
t.Cleanup(func() { r1.Close() })
peerChan <- peer.AddrInfo{ID: r1.ID(), Addrs: r1.Addrs()}
r := newBrokenRelay(t, 4)
t.Cleanup(func() { r.Close() })
peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}

// make sure we don't add any relays yet
require.Never(t, func() bool {
return len(ma.FilterAddrs(h.Addrs(), isRelayAddr)) > 0
}, 300*time.Millisecond, 50*time.Millisecond)
}

func TestStaticRelays(t *testing.T) {
const numRelays = 3
var staticRelays []peer.AddrInfo
for i := 0; i < numRelays; i++ {
r := newRelay(t)
t.Cleanup(func() { r.Close() })
staticRelays = append(staticRelays, peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()})
}

h := newPrivateNode(t,
autorelay.WithStaticRelays(staticRelays),
autorelay.WithNumRelays(1),
)
defer h.Close()

require.Eventually(t, func() bool {
return len(ma.FilterAddrs(h.Addrs(), isRelayAddr)) > 0
}, 2*time.Second, 50*time.Millisecond)
}
6 changes: 3 additions & 3 deletions p2p/host/autorelay/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ func init() {
type Option func(*config) error

func WithStaticRelays(static []peer.AddrInfo) Option {
return func(r *config) error {
if len(r.staticRelays) > 0 {
return func(c *config) error {
if len(c.staticRelays) > 0 {
return errors.New("can't set static relays, static relays already configured")
}
r.staticRelays = static
c.staticRelays = static
return nil
}
}
Expand Down
44 changes: 23 additions & 21 deletions p2p/host/autorelay/relay_finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,11 @@ func newRelayFinder(host *basic.BasicHost, peerChan <-chan peer.AddrInfo, conf *
}

func (rf *relayFinder) background(ctx context.Context) {
if len(rf.conf.staticRelays) == 0 {
rf.refCount.Add(1)
go func() {
defer rf.refCount.Done()
rf.findNodes(ctx)
}()
}
rf.refCount.Add(1)
go func() {
defer rf.refCount.Done()
rf.findNodes(ctx)
}()

subConnectedness, err := rf.host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged))
if err != nil {
Expand Down Expand Up @@ -138,10 +136,8 @@ func (rf *relayFinder) background(ctx context.Context) {
}
rf.relayMx.Unlock()
case <-rf.candidateFound:
log.Debugf("candidate found")
rf.handleNewCandidate(ctx)
case <-bootDelayTimer.C:
log.Debugf("boot delay timer")
rf.handleNewCandidate(ctx)
case <-rf.relayUpdated:
push = true
Expand All @@ -163,7 +159,7 @@ func (rf *relayFinder) background(ctx context.Context) {
}

// findNodes accepts nodes from the channel and tests if they support relaying.
// It is run on both public and private nodes (but not when static relays are set).
// It is run on both public and private nodes.
// It garbage collects old entries, so that nodes doesn't overflow.
// This makes sure that as soon as we need to find relay candidates, we have them available.
func (rf *relayFinder) findNodes(ctx context.Context) {
Expand All @@ -189,6 +185,13 @@ func (rf *relayFinder) findNodes(ctx context.Context) {
}
}

func (rf *relayFinder) notifyNewCandidate() {
select {
case rf.candidateFound <- struct{}{}:
default:
}
}

// handleNewNode tests if a peer supports circuit v1 or v2.
// This method is only run on private nodes.
// If a peer does, it is added to the candidates map.
Expand Down Expand Up @@ -220,13 +223,6 @@ func (rf *relayFinder) handleNewNode(ctx context.Context, pi peer.AddrInfo) {
rf.notifyNewCandidate()
}

func (rf *relayFinder) notifyNewCandidate() {
select {
case rf.candidateFound <- struct{}{}:
default:
}
}

// tryNode checks if a peer actually supports either circuit v1 or circuit v2.
// It does not modify any internal state.
func (rf *relayFinder) tryNode(ctx context.Context, pi peer.AddrInfo) (supportsRelayV1 bool, err error) {
Expand Down Expand Up @@ -293,10 +289,16 @@ func (rf *relayFinder) handleNewCandidate(ctx context.Context) {
if len(rf.relays) == rf.conf.desiredRelays {
return
}
// During the startup phase, we don't want to connect to the first candidate that we find.
// Instead, we wait until we've found at least minCandidates, and then select the best of those.
// However, if that takes too long (longer than bootDelay),
if len(rf.relays) == 0 && len(rf.candidates) < rf.conf.minCandidates && time.Since(rf.bootTime) < rf.conf.bootDelay {

if len(rf.conf.staticRelays) != 0 {
// make sure we read all static relays before continuing
if len(rf.peerChan) > 0 && len(rf.candidates) < rf.conf.minCandidates && time.Since(rf.bootTime) < rf.conf.bootDelay {
return
}
} else if len(rf.relays) == 0 && len(rf.candidates) < rf.conf.minCandidates && time.Since(rf.bootTime) < rf.conf.bootDelay {
// During the startup phase, we don't want to connect to the first candidate that we find.
// Instead, we wait until we've found at least minCandidates, and then select the best of those.
// However, if that takes too long (longer than bootDelay), we still go ahead.
return
}

Expand Down

0 comments on commit 7e767cb

Please sign in to comment.