From 16de44aa7b4a3de2c24f1d966565f36eb69c5316 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 28 Mar 2022 19:19:47 +0100 Subject: [PATCH 1/9] rename autorelay.go to relay_finder.go --- p2p/host/autorelay/{autorelay.go => relay_finder.go} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename p2p/host/autorelay/{autorelay.go => relay_finder.go} (100%) diff --git a/p2p/host/autorelay/autorelay.go b/p2p/host/autorelay/relay_finder.go similarity index 100% rename from p2p/host/autorelay/autorelay.go rename to p2p/host/autorelay/relay_finder.go From 29fd1022a490acc1a155a78170f09729b57c7aa5 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Wed, 30 Mar 2022 10:34:14 +0100 Subject: [PATCH 2/9] implement relay discovery in autorelay --- config/config.go | 22 +- options.go | 11 +- p2p/host/autorelay/autorelay.go | 123 +++++ p2p/host/autorelay/autorelay_test.go | 333 ++++++------ p2p/host/autorelay/doc.go | 28 - p2p/host/autorelay/log.go | 7 - p2p/host/autorelay/options.go | 135 +++++ p2p/host/autorelay/relay.go | 46 -- p2p/host/autorelay/relay_finder.go | 744 ++++++++++++++------------- 9 files changed, 796 insertions(+), 653 deletions(-) create mode 100644 p2p/host/autorelay/autorelay.go delete mode 100644 p2p/host/autorelay/doc.go delete mode 100644 p2p/host/autorelay/log.go create mode 100644 p2p/host/autorelay/options.go diff --git a/config/config.go b/config/config.go index a9a264158a..f3dc0ef410 100644 --- a/config/config.go +++ b/config/config.go @@ -18,7 +18,6 @@ import ( "github.com/libp2p/go-libp2p-core/transport" "github.com/libp2p/go-libp2p-peerstore/pstoremem" - drouting "github.com/libp2p/go-libp2p/p2p/discovery/routing" "github.com/libp2p/go-libp2p/p2p/host/autonat" "github.com/libp2p/go-libp2p/p2p/host/autorelay" bhost "github.com/libp2p/go-libp2p/p2p/host/basic" @@ -102,7 +101,7 @@ type Config struct { EnableAutoRelay bool AutoNATConfig - StaticRelayOpt autorelay.StaticRelayOption + AutoRelayOpts []autorelay.Option EnableHolePunching bool HolePunchingOptions []holepunch.Option @@ -270,7 +269,7 @@ func (cfg *Config) NewNode() (host.Host, error) { } } - // Note: h.AddrsFactory may be changed by AutoRelay, but non-relay version is + // Note: h.AddrsFactory may be changed by relayFinder, but non-relay version is // used by AutoNAT below. var ar *autorelay.AutoRelay addrF := h.AddrsFactory @@ -280,22 +279,7 @@ func (cfg *Config) NewNode() (host.Host, error) { return nil, fmt.Errorf("cannot enable autorelay; relay is not enabled") } - var opts []autorelay.Option - if cfg.StaticRelayOpt != nil { - opts = append(opts, autorelay.Option(cfg.StaticRelayOpt)) - } else { - if router == nil { - h.Close() - return nil, fmt.Errorf("cannot enable autorelay; no routing for discovery") - } - crouter, ok := router.(routing.ContentRouting) - if !ok { - h.Close() - return nil, fmt.Errorf("cannot enable autorelay; no suitable routing for discovery") - } - opts = append(opts, autorelay.WithDiscoverer(drouting.NewRoutingDiscovery(crouter))) - } - ar, err = autorelay.NewAutoRelay(h, router, opts...) + ar, err = autorelay.NewAutoRelay(h, cfg.AutoRelayOpts...) if err != nil { return nil, err } diff --git a/options.go b/options.go index 6821f4ed4f..ce40fa5d34 100644 --- a/options.go +++ b/options.go @@ -250,15 +250,10 @@ func EnableRelayService(opts ...relayv2.Option) Option { // // This subsystem performs automatic address rewriting to advertise relay addresses when it // detects that the node is publicly unreachable (e.g. behind a NAT). -func EnableAutoRelay(opts ...autorelay.StaticRelayOption) Option { +func EnableAutoRelay(opts ...autorelay.Option) Option { return func(cfg *Config) error { - if len(opts) > 0 { - if len(opts) > 1 { - return errors.New("only expected a single static relay configuration option") - } - cfg.StaticRelayOpt = opts[0] - } cfg.EnableAutoRelay = true + cfg.AutoRelayOpts = opts return nil } } @@ -269,7 +264,7 @@ func EnableAutoRelay(opts ...autorelay.StaticRelayOption) Option { // Deprecated: pass an autorelay.WithStaticRelays option to EnableAutoRelay. func StaticRelays(relays []peer.AddrInfo) Option { return func(cfg *Config) error { - cfg.StaticRelayOpt = autorelay.WithStaticRelays(relays) + cfg.AutoRelayOpts = append(cfg.AutoRelayOpts, autorelay.WithStaticRelays(relays)) return nil } } diff --git a/p2p/host/autorelay/autorelay.go b/p2p/host/autorelay/autorelay.go new file mode 100644 index 0000000000..07aa0a8fb7 --- /dev/null +++ b/p2p/host/autorelay/autorelay.go @@ -0,0 +1,123 @@ +package autorelay + +import ( + "context" + "sync" + + basic "github.com/libp2p/go-libp2p/p2p/host/basic" + + "github.com/libp2p/go-libp2p-core/event" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + + logging "github.com/ipfs/go-log/v2" + ma "github.com/multiformats/go-multiaddr" +) + +var log = logging.Logger("autorelay") + +type AutoRelay struct { + refCount sync.WaitGroup + ctx context.Context + ctxCancel context.CancelFunc + + mx sync.Mutex + status network.Reachability + + relayFinder *relayFinder + + peerChanIn <-chan peer.AddrInfo // capacity 0 + peerChanOut chan peer.AddrInfo // capacity 20 + + host host.Host + addrsF basic.AddrsFactory +} + +func NewAutoRelay(bhost *basic.BasicHost, opts ...Option) (*AutoRelay, error) { + r := &AutoRelay{ + host: bhost, + addrsF: bhost.AddrsFactory, + status: network.ReachabilityUnknown, + } + r.ctx, r.ctxCancel = context.WithCancel(context.Background()) + conf := defaultConfig + for _, opt := range opts { + if err := opt(&conf); err != nil { + return nil, err + } + } + r.peerChanIn = conf.peerChan + r.peerChanOut = make(chan peer.AddrInfo, conf.maxCandidates) + r.relayFinder = newRelayFinder(bhost, r.peerChanOut, &conf) + bhost.AddrsFactory = r.hostAddrs + + r.refCount.Add(1) + go func() { + defer r.refCount.Done() + r.background() + }() + return r, nil +} + +func (r *AutoRelay) background() { + subReachability, err := r.host.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged)) + if err != nil { + log.Debug("failed to subscribe to the EvtLocalReachabilityChanged") + return + } + defer subReachability.Close() + + for { + select { + case <-r.ctx.Done(): + return + case ev, ok := <-subReachability.Out(): + if !ok { + return + } + // TODO: push changed addresses + evt := ev.(event.EvtLocalReachabilityChanged) + switch evt.Reachability { + case network.ReachabilityPrivate, network.ReachabilityUnknown: + if err := r.relayFinder.Start(); err != nil { + log.Error("failed to start relay finder") + } + case network.ReachabilityPublic: + r.relayFinder.Stop() + } + r.mx.Lock() + r.status = evt.Reachability + r.mx.Unlock() + case pi := <-r.peerChanIn: + select { + case r.peerChanOut <- pi: // if there's space in the channel, great + default: + // no space left in the channel. Drop the oldest entry. + <-r.peerChanOut + r.peerChanOut <- pi + } + } + } +} + +func (r *AutoRelay) hostAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { + return r.relayAddrs(r.addrsF(addrs)) +} + +func (r *AutoRelay) relayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { + r.mx.Lock() + defer r.mx.Unlock() + + if r.status != network.ReachabilityPrivate { + return addrs + } + return r.relayFinder.relayAddrs(addrs) +} + +func (r *AutoRelay) Close() error { + r.ctxCancel() + err := r.relayFinder.Stop() + r.refCount.Wait() + return err +} diff --git a/p2p/host/autorelay/autorelay_test.go b/p2p/host/autorelay/autorelay_test.go index 1c836d720c..7ead52b296 100644 --- a/p2p/host/autorelay/autorelay_test.go +++ b/p2p/host/autorelay/autorelay_test.go @@ -1,132 +1,72 @@ package autorelay_test import ( - "context" - "net" "strings" - "sync" + "sync/atomic" "testing" "time" "github.com/libp2p/go-libp2p" - discovery "github.com/libp2p/go-libp2p/p2p/discovery/routing" "github.com/libp2p/go-libp2p/p2p/host/autorelay" - relayv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/relay" + circuitv2_proto "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto" relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay" - "github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/routing" - "github.com/ipfs/go-cid" ma "github.com/multiformats/go-multiaddr" - manet "github.com/multiformats/go-multiaddr/net" "github.com/stretchr/testify/require" ) -// test specific parameters -func init() { - autorelay.BootDelay = 1 * time.Second - autorelay.AdvertiseBootDelay = 100 * time.Millisecond -} - -// mock routing -type mockRoutingTable struct { - mx sync.Mutex - providers map[string]map[peer.ID]peer.AddrInfo - peers map[peer.ID]peer.AddrInfo -} - -func (t *mockRoutingTable) NumPeers() int { - t.mx.Lock() - defer t.mx.Unlock() - return len(t.peers) -} - -type mockRouting struct { - h host.Host - tab *mockRoutingTable -} - -func newMockRoutingTable() *mockRoutingTable { - return &mockRoutingTable{providers: make(map[string]map[peer.ID]peer.AddrInfo)} -} - -func newMockRouting(h host.Host, tab *mockRoutingTable) *mockRouting { - return &mockRouting{h: h, tab: tab} -} - -func (m *mockRouting) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) { - m.tab.mx.Lock() - defer m.tab.mx.Unlock() - pi, ok := m.tab.peers[p] - if !ok { - return peer.AddrInfo{}, routing.ErrNotFound - } - return pi, nil +func isRelayAddr(a ma.Multiaddr) (isRelay bool) { + ma.ForEach(a, func(c ma.Component) bool { + switch c.Protocol().Code { + case ma.P_CIRCUIT: + isRelay = true + return false + default: + return true + } + }) + return isRelay } -func (m *mockRouting) Provide(ctx context.Context, cid cid.Cid, bcast bool) error { - m.tab.mx.Lock() - defer m.tab.mx.Unlock() - - pmap, ok := m.tab.providers[cid.String()] - if !ok { - pmap = make(map[peer.ID]peer.AddrInfo) - m.tab.providers[cid.String()] = pmap - } - - pi := peer.AddrInfo{ID: m.h.ID(), Addrs: m.h.Addrs()} - pmap[m.h.ID()] = pi - if m.tab.peers == nil { - m.tab.peers = make(map[peer.ID]peer.AddrInfo) - } - m.tab.peers[m.h.ID()] = pi - - return nil +func newPrivateNode(t *testing.T, opts ...autorelay.Option) host.Host { + t.Helper() + h, err := libp2p.New( + libp2p.ForceReachabilityPrivate(), + libp2p.EnableAutoRelay(opts...), + ) + require.NoError(t, err) + return h } -func (m *mockRouting) FindProvidersAsync(ctx context.Context, cid cid.Cid, limit int) <-chan peer.AddrInfo { - ch := make(chan peer.AddrInfo) - go func() { - defer close(ch) - m.tab.mx.Lock() - defer m.tab.mx.Unlock() - - pmap, ok := m.tab.providers[cid.String()] - if !ok { - return - } - - for _, pi := range pmap { - select { - case ch <- pi: - case <-ctx.Done(): - return +func newRelay(t *testing.T) host.Host { + t.Helper() + h, err := libp2p.New( + libp2p.DisableRelay(), + libp2p.EnableRelayService(), + libp2p.ForceReachabilityPublic(), + libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr { + for i, addr := range addrs { + saddr := addr.String() + if strings.HasPrefix(saddr, "/ip4/127.0.0.1/") { + addrNoIP := strings.TrimPrefix(saddr, "/ip4/127.0.0.1") + addrs[i] = ma.StringCast("/dns4/localhost" + addrNoIP) + } } - } - }() - - return ch -} - -func connect(t *testing.T, a, b host.Host) { - pinfo := peer.AddrInfo{ID: a.ID(), Addrs: a.Addrs()} - require.NoError(t, b.Connect(context.Background(), pinfo)) + return addrs + }), + ) + require.NoError(t, err) + return h } -// and the actual test! -func TestAutoRelay(t *testing.T) { - private4 := manet.Private4 - t.Cleanup(func() { manet.Private4 = private4 }) - manet.Private4 = []*net.IPNet{} - - // this is the relay host - // announce dns addrs because filter out private addresses from relays, - // and we consider dns addresses "public". - relayHost, err := libp2p.New( +// creates a node that speaks the relay v2 protocol, but doesn't accept any reservations for the first workAfter tries +func newBrokenRelay(t *testing.T, workAfter int) host.Host { + t.Helper() + h, err := libp2p.New( libp2p.DisableRelay(), libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr { for i, addr := range addrs { @@ -137,87 +77,124 @@ func TestAutoRelay(t *testing.T) { } } return addrs - })) + }), + libp2p.EnableRelayService(), + ) require.NoError(t, err) - defer relayHost.Close() - - t.Run("with a circuitv1 relay", func(t *testing.T) { - r, err := relayv1.NewRelay(relayHost) - require.NoError(t, err) - defer r.Close() - testAutoRelay(t, relayHost) - }) - t.Run("testing autorelay with circuitv2 relay", func(t *testing.T) { - r, err := relayv2.New(relayHost) - require.NoError(t, err) - defer r.Close() - testAutoRelay(t, relayHost) + var n int32 + h.SetStreamHandler(circuitv2_proto.ProtoIDv2Hop, func(str network.Stream) { + t.Log("rejecting reservation") + str.Reset() + num := atomic.AddInt32(&n, 1) + if int(num) >= workAfter { + h.RemoveStreamHandler(circuitv2_proto.ProtoIDv2Hop) + r, err := relayv2.New(h) + require.NoError(t, err) + t.Cleanup(func() { r.Close() }) + } }) + return h } -func isRelayAddr(addr ma.Multiaddr) bool { - _, err := addr.ValueForProtocol(ma.P_CIRCUIT) - return err == nil +func TestSingleRelay(t *testing.T) { + const numPeers = 5 + peerChan := make(chan peer.AddrInfo) + done := make(chan struct{}) + go func() { + defer close(done) + for i := 0; i < numPeers; i++ { + r := newRelay(t) + t.Cleanup(func() { r.Close() }) + peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()} + } + }() + h := newPrivateNode(t, + autorelay.WithPeerSource(peerChan), + autorelay.WithNumCandidates(1), + autorelay.WithNumRelays(99999), + autorelay.WithBootDelay(0), + ) + defer h.Close() + + require.Eventually(t, func() bool { + return len(ma.FilterAddrs(h.Addrs(), isRelayAddr)) > 0 + }, 3*time.Second, 100*time.Millisecond) + <-done + // test that we don't add any more relays + require.Never(t, func() bool { + return len(ma.FilterAddrs(h.Addrs(), isRelayAddr)) != 1 + }, 200*time.Millisecond, 50*time.Millisecond) } -func testAutoRelay(t *testing.T, relayHost host.Host) { - mtab := newMockRoutingTable() - makeRouting := func(h host.Host) (*mockRouting, error) { - mr := newMockRouting(h, mtab) - return mr, nil - } - makePeerRouting := func(h host.Host) (routing.PeerRouting, error) { - return makeRouting(h) - } - - // advertise the relay - relayRouting, err := makeRouting(relayHost) - require.NoError(t, err) - relayDiscovery := discovery.NewRoutingDiscovery(relayRouting) - autorelay.Advertise(context.Background(), relayDiscovery) - require.Eventually(t, func() bool { return mtab.NumPeers() > 0 }, time.Second, 10*time.Millisecond) - - // the client hosts - h1, err := libp2p.New(libp2p.EnableRelay()) - require.NoError(t, err) - defer h1.Close() +func TestWaitForCandidates(t *testing.T) { + peerChan := make(chan peer.AddrInfo) + h := newPrivateNode(t, + autorelay.WithPeerSource(peerChan), + autorelay.WithMinCandidates(2), + autorelay.WithNumRelays(1), + autorelay.WithBootDelay(time.Hour), + ) + defer h.Close() + + r1 := newRelay(t) + t.Cleanup(func() { r1.Close() }) + peerChan <- peer.AddrInfo{ID: r1.ID(), Addrs: r1.Addrs()} + + // make sure we don't add any relays yet + // We need to wait until we have at least 2 candidates before we connect. + require.Never(t, func() bool { + return len(ma.FilterAddrs(h.Addrs(), isRelayAddr)) > 0 + }, 200*time.Millisecond, 50*time.Millisecond) + + r2 := newRelay(t) + t.Cleanup(func() { r2.Close() }) + peerChan <- peer.AddrInfo{ID: r2.ID(), Addrs: r2.Addrs()} + require.Eventually(t, func() bool { + return len(ma.FilterAddrs(h.Addrs(), isRelayAddr)) > 0 + }, 3*time.Second, 100*time.Millisecond) +} - h2, err := libp2p.New(libp2p.EnableRelay(), libp2p.EnableAutoRelay(), libp2p.Routing(makePeerRouting)) - require.NoError(t, err) - defer h2.Close() +func TestBackoff(t *testing.T) { + peerChan := make(chan peer.AddrInfo) + h := newPrivateNode(t, + autorelay.WithPeerSource(peerChan), + autorelay.WithNumRelays(1), + autorelay.WithBootDelay(0), + autorelay.WithBackoff(500*time.Millisecond), + ) + defer h.Close() + + r1 := newBrokenRelay(t, 1) + t.Cleanup(func() { r1.Close() }) + peerChan <- peer.AddrInfo{ID: r1.ID(), Addrs: r1.Addrs()} + + // make sure we don't add any relays yet + require.Never(t, func() bool { + return len(ma.FilterAddrs(h.Addrs(), isRelayAddr)) > 0 + }, 400*time.Millisecond, 50*time.Millisecond) + + require.Eventually(t, func() bool { + return len(ma.FilterAddrs(h.Addrs(), isRelayAddr)) > 0 + }, 400*time.Millisecond, 50*time.Millisecond) +} - // verify that we don't advertise relay addrs initially - for _, addr := range h2.Addrs() { - if isRelayAddr(addr) { - t.Fatal("relay addr advertised before auto detection") - } - } - - // connect to AutoNAT, have it resolve to private. - connect(t, h1, h2) - privEmitter, _ := h2.EventBus().Emitter(new(event.EvtLocalReachabilityChanged)) - privEmitter.Emit(event.EvtLocalReachabilityChanged{Reachability: network.ReachabilityPrivate}) - - hasRelayAddrs := func(t *testing.T, addrs []ma.Multiaddr) bool { - unspecificRelay := ma.StringCast("/p2p-circuit") - for _, addr := range addrs { - if addr.Equal(unspecificRelay) { - t.Fatal("unspecific relay addr advertised") - } - if isRelayAddr(addr) { - return true - } - } - return false - } - // Wait for detection to do its magic - require.Eventually(t, func() bool { return hasRelayAddrs(t, h2.Addrs()) }, 3*time.Second, 10*time.Millisecond) - // verify that we have pushed relay addrs to connected peers - require.Eventually(t, func() bool { return hasRelayAddrs(t, h1.Peerstore().Addrs(h2.ID())) }, time.Second, 10*time.Millisecond, "no relay addrs pushed") - - // verify that we can connect through the relay - h3, err := libp2p.New(libp2p.EnableRelay()) - require.NoError(t, err) - defer h3.Close() - require.NoError(t, h3.Connect(context.Background(), peer.AddrInfo{ID: h2.ID(), Addrs: ma.FilterAddrs(h2.Addrs(), isRelayAddr)})) +func TestMaxBackoffs(t *testing.T) { + peerChan := make(chan peer.AddrInfo) + h := newPrivateNode(t, + autorelay.WithPeerSource(peerChan), + autorelay.WithNumRelays(1), + autorelay.WithBootDelay(0), + autorelay.WithBackoff(25*time.Millisecond), + autorelay.WithMaxAttempts(3), + ) + defer h.Close() + + r1 := newBrokenRelay(t, 4) + t.Cleanup(func() { r1.Close() }) + peerChan <- peer.AddrInfo{ID: r1.ID(), Addrs: r1.Addrs()} + + // make sure we don't add any relays yet + require.Never(t, func() bool { + return len(ma.FilterAddrs(h.Addrs(), isRelayAddr)) > 0 + }, 300*time.Millisecond, 50*time.Millisecond) } diff --git a/p2p/host/autorelay/doc.go b/p2p/host/autorelay/doc.go deleted file mode 100644 index 4955dc5e1f..0000000000 --- a/p2p/host/autorelay/doc.go +++ /dev/null @@ -1,28 +0,0 @@ -/* -The relay package contains the components necessary to implement the "autorelay" -feature. - -Warning: the internal interfaces are unstable. - -System Components: -- A discovery service to discover public relays. -- An AutoNAT client used to determine if the node is behind a NAT/firewall. -- One or more autonat services, instances of `AutoNATServices`. These are used - by the autonat client. -- One or more relays, instances of `RelayHost`. -- The AutoRelay service. This is the service that actually: - -AutoNATService: https://github.com/libp2p/go-libp2p-autonat-svc -AutoNAT: https://github.com/libp2p/go-libp2p/p2p/host/autonat - -How it works: -- `AutoNATService` instances are instantiated in the bootstrappers (or other - well known publicly reachable hosts) -- `AutoRelay`s are constructed with `libp2p.New(libp2p.Routing(makeDHT))` - They passively discover autonat service instances and test dialability of - their listen address set through them. When the presence of NAT is detected, - they discover relays through the DHT, connect to some of them and begin - advertising relay addresses. The new set of addresses is propagated to - connected peers through the `identify/push` protocol. -*/ -package autorelay diff --git a/p2p/host/autorelay/log.go b/p2p/host/autorelay/log.go deleted file mode 100644 index 9c4e5ed52c..0000000000 --- a/p2p/host/autorelay/log.go +++ /dev/null @@ -1,7 +0,0 @@ -package autorelay - -import ( - logging "github.com/ipfs/go-log/v2" -) - -var log = logging.Logger("autorelay") diff --git a/p2p/host/autorelay/options.go b/p2p/host/autorelay/options.go new file mode 100644 index 0000000000..e6cd0dc817 --- /dev/null +++ b/p2p/host/autorelay/options.go @@ -0,0 +1,135 @@ +package autorelay + +import ( + "errors" + "fmt" + "time" + + "github.com/libp2p/go-libp2p-core/peer" +) + +type config struct { + peerChan <-chan peer.AddrInfo + staticRelays []peer.AddrInfo + // see WithMinCandidates + minCandidates int + // see WithNumCandidates + maxCandidates int + // Delay until we obtain reservations with relays, if we have less than minCandidates candidates. + // See WithBootDelay. + bootDelay time.Duration + // backoff is the time we wait after failing to obtain a reservation with a candidate + backoff time.Duration + // If we fail to obtain a reservation more than maxAttempts, we stop trying. + maxAttempts int + // Number of relays we strive to obtain a reservation with. + desiredRelays int +} + +var defaultConfig = config{ + minCandidates: 4, + maxCandidates: 20, + bootDelay: 3 * time.Minute, + backoff: time.Hour, + maxAttempts: 3, + desiredRelays: 2, +} + +// DefaultRelays are the known PL-operated v1 relays; will be decommissioned in 2022. +var DefaultRelays = []string{ + "/ip4/147.75.80.110/tcp/4001/p2p/QmbFgm5zan8P6eWWmeyfncR5feYEMPbht5b1FW1C37aQ7y", + "/ip4/147.75.80.110/udp/4001/quic/p2p/QmbFgm5zan8P6eWWmeyfncR5feYEMPbht5b1FW1C37aQ7y", + "/ip4/147.75.195.153/tcp/4001/p2p/QmW9m57aiBDHAkKj9nmFSEn7ZqrcF1fZS4bipsTCHburei", + "/ip4/147.75.195.153/udp/4001/quic/p2p/QmW9m57aiBDHAkKj9nmFSEn7ZqrcF1fZS4bipsTCHburei", + "/ip4/147.75.70.221/tcp/4001/p2p/Qme8g49gm3q4Acp7xWBKg3nAa9fxZ1YmyDJdyGgoG6LsXh", + "/ip4/147.75.70.221/udp/4001/quic/p2p/Qme8g49gm3q4Acp7xWBKg3nAa9fxZ1YmyDJdyGgoG6LsXh", +} + +var defaultStaticRelays []peer.AddrInfo + +func init() { + for _, s := range DefaultRelays { + pi, err := peer.AddrInfoFromString(s) + if err != nil { + panic(fmt.Sprintf("failed to initialize default static relays: %s", err)) + } + defaultStaticRelays = append(defaultStaticRelays, *pi) + } +} + +type Option func(*config) error + +func WithStaticRelays(static []peer.AddrInfo) Option { + return func(r *config) error { + if len(r.staticRelays) > 0 { + return errors.New("can't set static relays, static relays already configured") + } + r.staticRelays = static + return nil + } +} + +func WithDefaultStaticRelays() Option { + return WithStaticRelays(defaultStaticRelays) +} + +func WithPeerSource(peerChan <-chan peer.AddrInfo) Option { + return func(c *config) error { + c.peerChan = peerChan + return nil + } +} + +// WithNumRelays sets the number of relays we strive to obtain reservations with. +func WithNumRelays(n int) Option { + return func(c *config) error { + c.desiredRelays = n + return nil + } +} + +// WithNumCandidates sets the number of relay candidates that we buffer. +func WithNumCandidates(n int) Option { + return func(c *config) error { + c.maxCandidates = n + return nil + } +} + +// WithMinCandidates sets the minimum number of relay candidates we collect before to get a reservation +// with any of them (unless we've been running for longer than the boot delay). +// This is to make sure that we don't just randomly connect to the first candidate that we discover. +func WithMinCandidates(n int) Option { + return func(c *config) error { + c.minCandidates = n + return nil + } +} + +// WithBootDelay set the boot delay for finding relays. +// We won't attempt any reservation if we've have less than a minimum number of candidates. +// This prevents us to connect to the "first best" relay, and allows us to carefully select the relay. +// However, in case we haven't found enough relays after the boot delay, we use what we have. +func WithBootDelay(d time.Duration) Option { + return func(c *config) error { + c.bootDelay = d + return nil + } +} + +// WithBackoff sets the time we wait after failing to obtain a reservation with a candidate. +func WithBackoff(d time.Duration) Option { + return func(c *config) error { + c.backoff = d + return nil + } +} + +// WithMaxAttempts sets the number of times we attempt to obtain a reservation with a candidate. +// If we fail still fail to obtain a reservation, this candidate is dropped. +func WithMaxAttempts(n int) Option { + return func(c *config) error { + c.maxAttempts = n + return nil + } +} diff --git a/p2p/host/autorelay/relay.go b/p2p/host/autorelay/relay.go index 4d7fb0adee..db0d97ec01 100644 --- a/p2p/host/autorelay/relay.go +++ b/p2p/host/autorelay/relay.go @@ -1,55 +1,9 @@ package autorelay import ( - "context" - "time" - - "github.com/libp2p/go-libp2p-core/discovery" - ma "github.com/multiformats/go-multiaddr" ) -var ( - // this is purposefully long to require some node stability before advertising as a relay - AdvertiseBootDelay = 15 * time.Minute - AdvertiseTTL = 30 * time.Minute -) - -// Advertise advertises this node as a libp2p relay. -func Advertise(ctx context.Context, advertise discovery.Advertiser) { - go func() { - select { - case <-time.After(AdvertiseBootDelay): - go func() { - for { - ttl, err := advertise.Advertise(ctx, RelayRendezvous, discovery.TTL(AdvertiseTTL)) - if err != nil { - log.Debugf("Error advertising %s: %s", RelayRendezvous, err.Error()) - if ctx.Err() != nil { - return - } - - select { - case <-time.After(2 * time.Minute): - continue - case <-ctx.Done(): - return - } - } - - wait := 7 * ttl / 8 - select { - case <-time.After(wait): - case <-ctx.Done(): - return - } - } - }() - case <-ctx.Done(): - } - }() -} - // Filter filters out all relay addresses. func Filter(addrs []ma.Multiaddr) []ma.Multiaddr { raddrs := make([]ma.Multiaddr, 0, len(addrs)) diff --git a/p2p/host/autorelay/relay_finder.go b/p2p/host/autorelay/relay_finder.go index 7f81fa954b..4bb5c20c70 100644 --- a/p2p/host/autorelay/relay_finder.go +++ b/p2p/host/autorelay/relay_finder.go @@ -6,157 +6,116 @@ import ( "fmt" "math/rand" "sync" - "sync/atomic" "time" "golang.org/x/sync/errgroup" - "github.com/libp2p/go-libp2p-core/discovery" - "github.com/libp2p/go-libp2p-core/event" - "github.com/libp2p/go-libp2p-core/network" - "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/routing" - basic "github.com/libp2p/go-libp2p/p2p/host/basic" relayv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/relay" circuitv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client" circuitv2_proto "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto" + "github.com/libp2p/go-libp2p-core/event" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" ) const ( - RelayRendezvous = "/libp2p/relay" - - rsvpRefreshInterval = time.Minute - rsvpExpirationSlack = 2 * time.Minute - - autorelayTag = "autorelay" - protoIDv1 = string(relayv1.ProtoID) protoIDv2 = string(circuitv2_proto.ProtoIDv2Hop) ) -var ( - DesiredRelays = 1 - - BootDelay = 20 * time.Second -) - -// DefaultRelays are the known PL-operated v1 relays; will be decommissioned in 2022. -var DefaultRelays = []string{ - "/ip4/147.75.80.110/tcp/4001/p2p/QmbFgm5zan8P6eWWmeyfncR5feYEMPbht5b1FW1C37aQ7y", - "/ip4/147.75.80.110/udp/4001/quic/p2p/QmbFgm5zan8P6eWWmeyfncR5feYEMPbht5b1FW1C37aQ7y", - "/ip4/147.75.195.153/tcp/4001/p2p/QmW9m57aiBDHAkKj9nmFSEn7ZqrcF1fZS4bipsTCHburei", - "/ip4/147.75.195.153/udp/4001/quic/p2p/QmW9m57aiBDHAkKj9nmFSEn7ZqrcF1fZS4bipsTCHburei", - "/ip4/147.75.70.221/tcp/4001/p2p/Qme8g49gm3q4Acp7xWBKg3nAa9fxZ1YmyDJdyGgoG6LsXh", - "/ip4/147.75.70.221/udp/4001/quic/p2p/Qme8g49gm3q4Acp7xWBKg3nAa9fxZ1YmyDJdyGgoG6LsXh", -} - -var defaultStaticRelays []peer.AddrInfo +// Terminology: +// Candidate: Once we connect to a node and it supports (v1 / v2) relay protocol, +// we call it a candidate, and consider using it as a relay. +// Relay: Out of the list of candidates, we select a relay to connect to. +// Currently, we just randomly select a candidate, but we can employ more sophisticated +// selection strategies here (e.g. by facotring in the RTT). -func init() { - for _, s := range DefaultRelays { - pi, err := peer.AddrInfoFromString(s) - if err != nil { - panic(fmt.Sprintf("failed to initialize default static relays: %s", err)) - } - defaultStaticRelays = append(defaultStaticRelays, *pi) - } -} - -type Option func(*AutoRelay) error -type StaticRelayOption Option +const ( + rsvpRefreshInterval = time.Minute + rsvpExpirationSlack = 2 * time.Minute -func WithStaticRelays(static []peer.AddrInfo) StaticRelayOption { - return func(r *AutoRelay) error { - if len(r.static) > 0 { - return errors.New("can't set static relays, static relays already configured") - } - r.static = static - return nil - } -} + autorelayTag = "autorelay" +) -func WithDefaultStaticRelays() StaticRelayOption { - return WithStaticRelays(defaultStaticRelays) +type candidate struct { + added time.Time + isRelayV1 bool + ai peer.AddrInfo + numAttempts int } -func WithDiscoverer(discover discovery.Discoverer) Option { - return func(r *AutoRelay) error { - r.discover = discover - return nil - } +type candidateOnBackoff struct { + candidate + nextConnAttempt time.Time } -// AutoRelay is a Host that uses relays for connectivity when a NAT is detected. -type AutoRelay struct { +// relayFinder is a Host that uses relays for connectivity when a NAT is detected. +type relayFinder struct { + bootTime time.Time host *basic.BasicHost - discover discovery.Discoverer - router routing.PeerRouting - addrsF basic.AddrsFactory - static []peer.AddrInfo + conf *config refCount sync.WaitGroup ctxCancel context.CancelFunc - relayFound chan struct{} - findRelaysRunning int32 // to be used as an atomic + peerChan <-chan peer.AddrInfo - mx sync.Mutex - relays map[peer.ID]*circuitv2.Reservation // rsvp will be nil if it is a v1 relay - status network.Reachability + candidateFound chan struct{} // receives every time we find a new relay candidate + candidateMx sync.Mutex + candidates map[peer.ID]*candidate + candidatesOnBackoff []*candidateOnBackoff // this slice is always sorted by the nextConnAttempt time + + relayUpdated chan struct{} + + relayMx sync.Mutex + relays map[peer.ID]*circuitv2.Reservation // rsvp will be nil if it is a v1 relay cachedAddrs []ma.Multiaddr cachedAddrsExpiry time.Time } -func NewAutoRelay(bhost *basic.BasicHost, router routing.PeerRouting, opts ...Option) (*AutoRelay, error) { - ctx, cancel := context.WithCancel(context.Background()) - ar := &AutoRelay{ - ctxCancel: cancel, - host: bhost, - router: router, - addrsF: bhost.AddrsFactory, - relays: make(map[peer.ID]*circuitv2.Reservation), - relayFound: make(chan struct{}, 1), - status: network.ReachabilityUnknown, - } - for _, opt := range opts { - if err := opt(ar); err != nil { - return nil, err - } - } - bhost.AddrsFactory = ar.hostAddrs - ar.refCount.Add(1) - go ar.background(ctx) - return ar, nil +func newRelayFinder(host *basic.BasicHost, peerChan <-chan peer.AddrInfo, conf *config) *relayFinder { + r := &relayFinder{ + bootTime: time.Now(), + host: host, + conf: conf, + peerChan: peerChan, + candidates: make(map[peer.ID]*candidate), + candidateFound: make(chan struct{}, 1), + relays: make(map[peer.ID]*circuitv2.Reservation), + relayUpdated: make(chan struct{}, 1), + } + return r } -func (ar *AutoRelay) hostAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { - return ar.relayAddrs(ar.addrsF(addrs)) -} - -func (ar *AutoRelay) background(ctx context.Context) { - defer ar.refCount.Done() - - subReachability, err := ar.host.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged)) - if err != nil { - log.Error("failed to subscribe to the EvtLocalReachabilityChanged") - return +func (rf *relayFinder) background(ctx context.Context) { + if len(rf.conf.staticRelays) == 0 { + rf.refCount.Add(1) + go func() { + defer rf.refCount.Done() + rf.findNodes(ctx) + }() } - defer subReachability.Close() - subConnectedness, err := ar.host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged)) + + subConnectedness, err := rf.host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged)) if err != nil { log.Error("failed to subscribe to the EvtPeerConnectednessChanged") return } defer subConnectedness.Close() - ticker := time.NewTicker(rsvpRefreshInterval) - defer ticker.Stop() + bootDelayTimer := time.NewTimer(rf.conf.bootDelay) + defer bootDelayTimer.Stop() + refreshTicker := time.NewTicker(rsvpRefreshInterval) + defer refreshTicker.Stop() + backoffTicker := time.NewTicker(rf.conf.backoff / 5) + defer backoffTicker.Stop() for { // when true, we need to identify push @@ -168,313 +127,355 @@ func (ar *AutoRelay) background(ctx context.Context) { return } evt := ev.(event.EvtPeerConnectednessChanged) - switch evt.Connectedness { - case network.Connected: - // If we just connect to one of our static relays, get a reservation immediately. - for _, pi := range ar.static { - if pi.ID == evt.Peer { - rsvp, ok := ar.tryRelay(ctx, pi) - if ok { - ar.mx.Lock() - ar.relays[pi.ID] = rsvp - ar.mx.Unlock() - } - push = true - break - } - } - case network.NotConnected: - ar.mx.Lock() - if ar.usingRelay(evt.Peer) { // we were disconnected from a relay - delete(ar.relays, evt.Peer) - push = true - } - ar.mx.Unlock() - } - case ev, ok := <-subReachability.Out(): - if !ok { - return + if evt.Connectedness != network.NotConnected { + continue } - evt := ev.(event.EvtLocalReachabilityChanged) - - if evt.Reachability == network.ReachabilityPrivate { - // findRelays is a long-lived task (runs up to 2.5 minutes) - // Make sure we only start it once. - if atomic.CompareAndSwapInt32(&ar.findRelaysRunning, 0, 1) { - go func() { - defer atomic.StoreInt32(&ar.findRelaysRunning, 0) - ar.findRelays(ctx) - }() - } - } - - ar.mx.Lock() - // if our reachability changed - if ar.status != evt.Reachability && evt.Reachability != network.ReachabilityUnknown { + rf.relayMx.Lock() + if rf.usingRelay(evt.Peer) { // we were disconnected from a relay + log.Debugw("disconnected from relay", "id", evt.Peer) + delete(rf.relays, evt.Peer) push = true } - ar.status = evt.Reachability - ar.mx.Unlock() - case <-ar.relayFound: + rf.relayMx.Unlock() + case <-rf.candidateFound: + log.Debugf("candidate found") + rf.handleNewCandidate(ctx) + case <-bootDelayTimer.C: + log.Debugf("boot delay timer") + rf.handleNewCandidate(ctx) + case <-rf.relayUpdated: push = true - case now := <-ticker.C: - push = ar.refreshReservations(ctx, now) + case now := <-refreshTicker.C: + push = rf.refreshReservations(ctx, now) + case now := <-backoffTicker.C: + rf.checkForCandidatesOnBackoff(now) case <-ctx.Done(): return } if push { - ar.mx.Lock() - ar.cachedAddrs = nil - ar.mx.Unlock() - ar.host.SignalAddressChange() + rf.relayMx.Lock() + rf.cachedAddrs = nil + rf.relayMx.Unlock() + rf.host.SignalAddressChange() } } } -func (ar *AutoRelay) refreshReservations(ctx context.Context, now time.Time) bool { - ar.mx.Lock() - if ar.status == network.ReachabilityPublic { - // we are public, forget about the relays, unprotect peers - for p := range ar.relays { - ar.host.ConnManager().Unprotect(p, autorelayTag) - delete(ar.relays, p) +// findNodes accepts nodes from the channel and tests if they support relaying. +// It is run on both public and private nodes (but not when static relays are set). +// It garbage collects old entries, so that nodes doesn't overflow. +// This makes sure that as soon as we need to find relay candidates, we have them available. +func (rf *relayFinder) findNodes(ctx context.Context) { + for { + select { + case pi := <-rf.peerChan: + log.Debugw("found node", "id", pi.ID) + rf.candidateMx.Lock() + numCandidates := len(rf.candidates) + rf.candidateMx.Unlock() + if numCandidates >= rf.conf.maxCandidates { + log.Debugw("skipping node. Already have enough candidates", "id", pi.ID, "num", numCandidates, "max", rf.conf.maxCandidates) + continue + } + rf.refCount.Add(1) + go func() { + defer rf.refCount.Done() + rf.handleNewNode(ctx, pi) + }() + case <-ctx.Done(): + return } - - ar.mx.Unlock() - return true } +} - if len(ar.relays) == 0 { - ar.mx.Unlock() - return false +// handleNewNode tests if a peer supports circuit v1 or v2. +// This method is only run on private nodes. +// If a peer does, it is added to the candidates map. +// Note that just supporting the protocol doesn't guarantee that we can also obtain a reservation. +func (rf *relayFinder) handleNewNode(ctx context.Context, pi peer.AddrInfo) { + rf.relayMx.Lock() + relayInUse := rf.usingRelay(pi.ID) + rf.relayMx.Unlock() + if relayInUse { + return } - // find reservations about to expire and refresh them in parallel - g := new(errgroup.Group) - for p, rsvp := range ar.relays { - if rsvp == nil { - // this is a circuitv1 relay, there is no reservation - continue - } - if now.Add(rsvpExpirationSlack).Before(rsvp.Expiration) { - continue - } - - p := p - g.Go(func() error { return ar.refreshRelayReservation(ctx, p) }) + ctx, cancel := context.WithTimeout(ctx, 20*time.Second) + defer cancel() + supportsV1, err := rf.tryNode(ctx, pi) + if err != nil { + log.Debugf("node %s not accepted as a candidate: %s", pi.ID, err) + return } - ar.mx.Unlock() + rf.candidateMx.Lock() + if len(rf.candidates) > rf.conf.maxCandidates { + rf.candidateMx.Unlock() + return + } + log.Debugw("node supports relay protocol", "peer", pi.ID, "supports circuitv2", !supportsV1) + rf.candidates[pi.ID] = &candidate{ai: pi, isRelayV1: supportsV1} + rf.candidateMx.Unlock() - err := g.Wait() - return err != nil + rf.notifyNewCandidate() } -func (ar *AutoRelay) refreshRelayReservation(ctx context.Context, p peer.ID) error { - rsvp, err := circuitv2.Reserve(ctx, ar.host, peer.AddrInfo{ID: p}) - - ar.mx.Lock() - defer ar.mx.Unlock() - - if err != nil { - log.Debugf("failed to refresh relay slot reservation with %s: %s", p, err) +func (rf *relayFinder) notifyNewCandidate() { + select { + case rf.candidateFound <- struct{}{}: + default: + } +} - delete(ar.relays, p) - // unprotect the connection - ar.host.ConnManager().Unprotect(p, autorelayTag) - } else { - log.Debugf("refreshed relay slot reservation with %s", p) - ar.relays[p] = rsvp +// tryNode checks if a peer actually supports either circuit v1 or circuit v2. +// It does not modify any internal state. +func (rf *relayFinder) tryNode(ctx context.Context, pi peer.AddrInfo) (supportsRelayV1 bool, err error) { + if err := rf.host.Connect(ctx, pi); err != nil { + return false, fmt.Errorf("error connecting to relay %s: %w", pi.ID, err) } - return err -} + conns := rf.host.Network().ConnsToPeer(pi.ID) + for _, conn := range conns { + if isRelayAddr(conn.RemoteMultiaddr()) { + return false, errors.New("not a public node") + } + } -func (ar *AutoRelay) findRelays(ctx context.Context) { - timer := time.NewTimer(30 * time.Second) - defer timer.Stop() - for retry := 0; retry < 5; retry++ { - if retry > 0 { - log.Debug("no relays connected; retrying in 30s") + // wait for identify to complete in at least one conn so that we can check the supported protocols + ready := make(chan struct{}, 1) + for _, conn := range conns { + go func(conn network.Conn) { select { - case <-timer.C: + case <-rf.host.IDService().IdentifyWait(conn): + select { + case ready <- struct{}{}: + default: + } case <-ctx.Done(): - return } - } + }(conn) + } - if foundAtLeastOneRelay := ar.findRelaysOnce(ctx); foundAtLeastOneRelay { - return - } + select { + case <-ready: + case <-ctx.Done(): + return false, ctx.Err() } -} -func (ar *AutoRelay) findRelaysOnce(ctx context.Context) bool { - relays, err := ar.discoverRelays(ctx) + protos, err := rf.host.Peerstore().SupportsProtocols(pi.ID, protoIDv1, protoIDv2) if err != nil { - log.Debugf("error discovering relays: %s", err) - return false + return false, fmt.Errorf("error checking relay protocol support for peer %s: %w", pi.ID, err) } - log.Debugf("discovered %d relays", len(relays)) - relays = ar.selectRelays(ctx, relays) - log.Debugf("selected %d relays", len(relays)) - - var found bool - for _, pi := range relays { - ar.mx.Lock() - relayInUse := ar.usingRelay(pi.ID) - ar.mx.Unlock() - if relayInUse { - continue - } - rsvp, ok := ar.tryRelay(ctx, pi) - if !ok { - continue - } - // make sure we're still connected. - if ar.host.Network().Connectedness(pi.ID) != network.Connected { - continue - } - found = true - ar.mx.Lock() - ar.relays[pi.ID] = rsvp - // protect the connection - ar.host.ConnManager().Protect(pi.ID, autorelayTag) - numRelays := len(ar.relays) - ar.mx.Unlock() - - if numRelays >= DesiredRelays { - break + + // If the node speaks both, prefer circuit v2 + var supportsV1, supportsV2 bool + for _, proto := range protos { + switch proto { + case protoIDv1: + supportsV1 = true + case protoIDv2: + supportsV2 = true } } - if found { - ar.relayFound <- struct{}{} - return true + if !supportsV1 && !supportsV2 { + return false, errors.New("doesn't speak circuit v1 or v2") } - return false -} - -// usingRelay returns if we're currently using the given relay. -func (ar *AutoRelay) usingRelay(p peer.ID) bool { - _, ok := ar.relays[p] - return ok + return supportsV1, nil } -// addRelay adds the given relay to our set of relays. -// returns true when we add a new relay -func (ar *AutoRelay) tryRelay(ctx context.Context, pi peer.AddrInfo) (*circuitv2.Reservation, bool) { - if !ar.connect(ctx, pi) { - return nil, false +func (rf *relayFinder) handleNewCandidate(ctx context.Context) { + rf.relayMx.Lock() + defer rf.relayMx.Unlock() + if len(rf.candidates) == 0 { + return } - - protos, err := ar.host.Peerstore().SupportsProtocols(pi.ID, protoIDv1, protoIDv2) - if err != nil { - log.Debugf("error checking relay protocol support for peer %s: %s", pi.ID, err) - return nil, false + // We're already connected to our desired number of relays. Nothing to do here. + if len(rf.relays) == rf.conf.desiredRelays { + return + } + // During the startup phase, we don't want to connect to the first candidate that we find. + // Instead, we wait until we've found at least minCandidates, and then select the best of those. + // However, if that takes too long (longer than bootDelay), + if len(rf.relays) == 0 && len(rf.candidates) < rf.conf.minCandidates && time.Since(rf.bootTime) < rf.conf.bootDelay { + return } - var supportsv1, supportsv2 bool -protoLoop: - for _, proto := range protos { - switch proto { - case protoIDv1: - supportsv1 = true - case protoIDv2: - supportsv2 = true - break protoLoop + candidates := rf.selectCandidates() + + // We now iterate over the candidates, attempting (sequentially) to get reservations with them, until + // we reach the desired number of relays. + rf.refCount.Add(1) + go func() { + defer rf.refCount.Done() + + for _, cand := range candidates { + id := cand.ai.ID + var failed bool + var rsvp *circuitv2.Reservation + + // make sure we're still connected. + if rf.host.Network().Connectedness(id) != network.Connected { + if err := rf.host.Connect(ctx, cand.ai); err != nil { + log.Debugw("failed to reconnect", "peer", cand.ai, "error", err) + rf.candidateMx.Lock() + delete(rf.candidates, cand.ai.ID) + rf.candidateMx.Unlock() + continue + } + } + if !cand.isRelayV1 { + var err error + rsvp, err = circuitv2.Reserve(ctx, rf.host, cand.ai) + if err != nil { + failed = true + log.Debugw("failed to reserve slot", "id", id, "error", err) + } + } else { + ok, err := relayv1.CanHop(ctx, rf.host, id) + if err != nil { + failed = true + log.Debugw("error querying relay for v1 hop", "id", id, "error", err) + } + if !ok { + failed = true + log.Debugw("relay can't hop", "id", id) + } + } + rf.candidateMx.Lock() + if failed { + cand.numAttempts++ + delete(rf.candidates, id) + // We failed to obtain a reservation for too many times. We give up. + if cand.numAttempts >= rf.conf.maxAttempts { + log.Debugw("failed to obtain a reservation with. Giving up.", "id", id, "num attempts", cand.numAttempts) + } else { + rf.moveCandidateToBackoff(cand) + } + rf.candidateMx.Unlock() + continue + } + rf.candidateMx.Unlock() + log.Debugw("adding new relay", "id", id) + rf.relayMx.Lock() + rf.relays[id] = rsvp + numRelays := len(rf.relays) + rf.relayMx.Unlock() + + rf.host.ConnManager().Protect(id, autorelayTag) // protect the connection + + select { + case rf.relayUpdated <- struct{}{}: + default: + } + if numRelays >= rf.conf.desiredRelays { + break + } } + }() +} + +// must be called with mutex locked +func (rf *relayFinder) moveCandidateToBackoff(cand *candidate) { + if len(rf.candidatesOnBackoff) >= rf.conf.maxCandidates { + log.Debugw("already have enough candidates on backoff. Dropping.", "id", cand.ai.ID) + return } + log.Debugw("moving candidate to backoff", "id", cand.ai.ID) + rf.candidatesOnBackoff = append(rf.candidatesOnBackoff, &candidateOnBackoff{ + candidate: *cand, + nextConnAttempt: time.Now().Add(rf.conf.backoff), + }) +} + +func (rf *relayFinder) checkForCandidatesOnBackoff(now time.Time) { + rf.candidateMx.Lock() + defer rf.candidateMx.Unlock() - switch { - case supportsv2: - rsvp, err := circuitv2.Reserve(ctx, ar.host, pi) - if err != nil { - log.Debugf("error reserving slot with %s: %s", pi.ID, err) - return nil, false + for _, cand := range rf.candidatesOnBackoff { + if cand.nextConnAttempt.After(now) { + break } - return rsvp, true - case supportsv1: - ok, err := relayv1.CanHop(ctx, ar.host, pi.ID) - if err != nil { - log.Debugf("error querying relay %s for v1 hop: %s", pi.ID, err) - return nil, false + if len(rf.candidates) >= rf.conf.maxCandidates { + // drop this candidate if we already have enough others + log.Debugw("cannot move backoff'ed candidate back. Already have enough candidates.", "id", cand.ai.ID) + } else { + log.Debugw("moving backoff'ed candidate back", "id", cand.ai.ID) + rf.candidates[cand.ai.ID] = &candidate{ + added: cand.added, + isRelayV1: cand.isRelayV1, + ai: cand.ai, + numAttempts: cand.numAttempts, + } + rf.notifyNewCandidate() } - return nil, ok - default: // supports neither, unusable relay. - return nil, false + rf.candidatesOnBackoff = rf.candidatesOnBackoff[1:] } } -func (ar *AutoRelay) connect(ctx context.Context, pi peer.AddrInfo) bool { - ctx, cancel := context.WithTimeout(ctx, 60*time.Second) - defer cancel() +func (rf *relayFinder) refreshReservations(ctx context.Context, now time.Time) bool { + rf.relayMx.Lock() - if len(pi.Addrs) == 0 { - var err error - pi, err = ar.router.FindPeer(ctx, pi.ID) - if err != nil { - log.Debugf("error finding relay peer %s: %s", pi.ID, err.Error()) - return false + // find reservations about to expire and refresh them in parallel + g := new(errgroup.Group) + for p, rsvp := range rf.relays { + if rsvp == nil { // this is a circuit v1 relay, there is no reservation + continue + } + if now.Add(rsvpExpirationSlack).Before(rsvp.Expiration) { + continue } - } - err := ar.host.Connect(ctx, pi) - if err != nil { - log.Debugf("error connecting to relay %s: %s", pi.ID, err.Error()) - return false + p := p + g.Go(func() error { return rf.refreshRelayReservation(ctx, p) }) } + rf.relayMx.Unlock() - // wait for identify to complete in at least one conn so that we can check the supported protocols - conns := ar.host.Network().ConnsToPeer(pi.ID) - if len(conns) == 0 { - return false - } + err := g.Wait() + return err != nil +} - ready := make(chan struct{}, len(conns)) - for _, conn := range conns { - go func(conn network.Conn) { - select { - case <-ar.host.IDService().IdentifyWait(conn): - ready <- struct{}{} - case <-ctx.Done(): - } - }(conn) - } +func (rf *relayFinder) refreshRelayReservation(ctx context.Context, p peer.ID) error { + rsvp, err := circuitv2.Reserve(ctx, rf.host, peer.AddrInfo{ID: p}) - select { - case <-ready: - case <-ctx.Done(): - return false + rf.relayMx.Lock() + defer rf.relayMx.Unlock() + + if err != nil { + log.Debugw("failed to refresh relay slot reservation", "relay", p, "error", err) + + delete(rf.relays, p) + // unprotect the connection + rf.host.ConnManager().Unprotect(p, autorelayTag) + return err } - return true + log.Debugw("refreshed relay slot reservation", "relay", p) + rf.relays[p] = rsvp + return nil } -func (ar *AutoRelay) discoverRelays(ctx context.Context) ([]peer.AddrInfo, error) { - if len(ar.static) > 0 { - return ar.static, nil - } +// usingRelay returns if we're currently using the given relay. +func (rf *relayFinder) usingRelay(p peer.ID) bool { + _, ok := rf.relays[p] + return ok +} - ctx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() - var ret []peer.AddrInfo - ch, err := ar.discover.FindPeers(ctx, RelayRendezvous, discovery.Limit(1000)) - if err != nil { - return nil, err - } - for p := range ch { - ret = append(ret, p) +// selectCandidates returns an ordered slice of relay candidates. +// Callers should attempt to obtain reservations with the candidates in this order. +func (rf *relayFinder) selectCandidates() []*candidate { + rf.candidateMx.Lock() + var candidates []*candidate + for _, cand := range rf.candidates { + candidates = append(candidates, cand) } - return ret, nil -} + rf.candidateMx.Unlock() -func (ar *AutoRelay) selectRelays(ctx context.Context, pis []peer.AddrInfo) []peer.AddrInfo { // TODO: better relay selection strategy; this just selects random relays, // but we should probably use ping latency as the selection metric - rand.Shuffle(len(pis), func(i, j int) { - pis[i], pis[j] = pis[j], pis[i] + rand.Shuffle(len(candidates), func(i, j int) { + candidates[i], candidates[j] = candidates[j], candidates[i] }) - return pis + return candidates } // This function is computes the NATed relay addrs when our status is private: @@ -484,19 +485,15 @@ func (ar *AutoRelay) selectRelays(ctx context.Context, pis []peer.AddrInfo) []pe // - On top of those, we add the relay-specific addrs for the relays to which we are // connected. For each non-private relay addr, we encapsulate the p2p-circuit addr // through which we can be dialed. -func (ar *AutoRelay) relayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { - ar.mx.Lock() - defer ar.mx.Unlock() +func (rf *relayFinder) relayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { + rf.relayMx.Lock() + defer rf.relayMx.Unlock() - if ar.status != network.ReachabilityPrivate { - return addrs + if rf.cachedAddrs != nil && time.Now().Before(rf.cachedAddrsExpiry) { + return rf.cachedAddrs } - if ar.cachedAddrs != nil && time.Now().Before(ar.cachedAddrsExpiry) { - return ar.cachedAddrs - } - - raddrs := make([]ma.Multiaddr, 0, 4*len(ar.relays)+4) + raddrs := make([]ma.Multiaddr, 0, 4*len(rf.relays)+4) // only keep private addrs from the original addr set for _, addr := range addrs { @@ -506,28 +503,41 @@ func (ar *AutoRelay) relayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { } // add relay specific addrs to the list - for p := range ar.relays { - addrs := cleanupAddressSet(ar.host.Peerstore().Addrs(p)) - - circuit, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s/p2p-circuit", p.Pretty())) - if err != nil { - panic(err) - } + for p := range rf.relays { + addrs := cleanupAddressSet(rf.host.Peerstore().Addrs(p)) + circuit := ma.StringCast(fmt.Sprintf("/p2p/%s/p2p-circuit", p.Pretty())) for _, addr := range addrs { pub := addr.Encapsulate(circuit) raddrs = append(raddrs, pub) } } - ar.cachedAddrs = raddrs - ar.cachedAddrsExpiry = time.Now().Add(30 * time.Second) + rf.cachedAddrs = raddrs + rf.cachedAddrsExpiry = time.Now().Add(30 * time.Second) return raddrs } -func (ar *AutoRelay) Close() error { - ar.ctxCancel() - ar.refCount.Wait() +func (rf *relayFinder) Start() error { + if rf.ctxCancel != nil { + return errors.New("relayFinder already running") + } + log.Debug("starting relay finder") + ctx, cancel := context.WithCancel(context.Background()) + rf.ctxCancel = cancel + rf.refCount.Add(1) + go func() { + defer rf.refCount.Done() + rf.background(ctx) + }() + return nil +} + +func (rf *relayFinder) Stop() error { + log.Debug("stopping relay finder") + rf.ctxCancel() + rf.refCount.Wait() + rf.ctxCancel = nil return nil } From 7e767cbc22a14467807d93c8ef5c9a9ecdb89bb0 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Wed, 30 Mar 2022 13:24:24 +0100 Subject: [PATCH 3/9] correctly handle static relays --- config/config.go | 2 +- p2p/host/autorelay/autorelay.go | 28 +++++++++++++++--- p2p/host/autorelay/autorelay_test.go | 27 ++++++++++++++--- p2p/host/autorelay/options.go | 6 ++-- p2p/host/autorelay/relay_finder.go | 44 +++++++++++++++------------- 5 files changed, 74 insertions(+), 33 deletions(-) diff --git a/config/config.go b/config/config.go index f3dc0ef410..880efc9ee5 100644 --- a/config/config.go +++ b/config/config.go @@ -100,8 +100,8 @@ type Config struct { Routing RoutingC EnableAutoRelay bool + AutoRelayOpts []autorelay.Option AutoNATConfig - AutoRelayOpts []autorelay.Option EnableHolePunching bool HolePunchingOptions []holepunch.Option diff --git a/p2p/host/autorelay/autorelay.go b/p2p/host/autorelay/autorelay.go index 07aa0a8fb7..2ed66def77 100644 --- a/p2p/host/autorelay/autorelay.go +++ b/p2p/host/autorelay/autorelay.go @@ -22,13 +22,14 @@ type AutoRelay struct { ctx context.Context ctxCancel context.CancelFunc + conf *config + mx sync.Mutex status network.Reachability relayFinder *relayFinder - peerChanIn <-chan peer.AddrInfo // capacity 0 - peerChanOut chan peer.AddrInfo // capacity 20 + peerChanOut chan peer.AddrInfo // capacity 20 host host.Host addrsF basic.AddrsFactory @@ -47,8 +48,8 @@ func NewAutoRelay(bhost *basic.BasicHost, opts ...Option) (*AutoRelay, error) { return nil, err } } - r.peerChanIn = conf.peerChan r.peerChanOut = make(chan peer.AddrInfo, conf.maxCandidates) + r.conf = &conf r.relayFinder = newRelayFinder(bhost, r.peerChanOut, &conf) bhost.AddrsFactory = r.hostAddrs @@ -68,6 +69,25 @@ func (r *AutoRelay) background() { } defer subReachability.Close() + var peerChan <-chan peer.AddrInfo + if len(r.conf.staticRelays) == 0 { + peerChan = r.conf.peerChan + } else { + pc := make(chan peer.AddrInfo) + peerChan = pc + r.refCount.Add(1) + go func() { + defer r.refCount.Done() + for _, sr := range r.conf.staticRelays { + select { + case pc <- sr: + case <-r.ctx.Done(): + return + } + } + }() + } + for { select { case <-r.ctx.Done(): @@ -89,7 +109,7 @@ func (r *AutoRelay) background() { r.mx.Lock() r.status = evt.Reachability r.mx.Unlock() - case pi := <-r.peerChanIn: + case pi := <-peerChan: select { case r.peerChanOut <- pi: // if there's space in the channel, great default: diff --git a/p2p/host/autorelay/autorelay_test.go b/p2p/host/autorelay/autorelay_test.go index 7ead52b296..6abd1f70ff 100644 --- a/p2p/host/autorelay/autorelay_test.go +++ b/p2p/host/autorelay/autorelay_test.go @@ -83,7 +83,6 @@ func newBrokenRelay(t *testing.T, workAfter int) host.Host { require.NoError(t, err) var n int32 h.SetStreamHandler(circuitv2_proto.ProtoIDv2Hop, func(str network.Stream) { - t.Log("rejecting reservation") str.Reset() num := atomic.AddInt32(&n, 1) if int(num) >= workAfter { @@ -189,12 +188,32 @@ func TestMaxBackoffs(t *testing.T) { ) defer h.Close() - r1 := newBrokenRelay(t, 4) - t.Cleanup(func() { r1.Close() }) - peerChan <- peer.AddrInfo{ID: r1.ID(), Addrs: r1.Addrs()} + r := newBrokenRelay(t, 4) + t.Cleanup(func() { r.Close() }) + peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()} // make sure we don't add any relays yet require.Never(t, func() bool { return len(ma.FilterAddrs(h.Addrs(), isRelayAddr)) > 0 }, 300*time.Millisecond, 50*time.Millisecond) } + +func TestStaticRelays(t *testing.T) { + const numRelays = 3 + var staticRelays []peer.AddrInfo + for i := 0; i < numRelays; i++ { + r := newRelay(t) + t.Cleanup(func() { r.Close() }) + staticRelays = append(staticRelays, peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}) + } + + h := newPrivateNode(t, + autorelay.WithStaticRelays(staticRelays), + autorelay.WithNumRelays(1), + ) + defer h.Close() + + require.Eventually(t, func() bool { + return len(ma.FilterAddrs(h.Addrs(), isRelayAddr)) > 0 + }, 2*time.Second, 50*time.Millisecond) +} diff --git a/p2p/host/autorelay/options.go b/p2p/host/autorelay/options.go index e6cd0dc817..41ab1ef829 100644 --- a/p2p/host/autorelay/options.go +++ b/p2p/host/autorelay/options.go @@ -60,11 +60,11 @@ func init() { type Option func(*config) error func WithStaticRelays(static []peer.AddrInfo) Option { - return func(r *config) error { - if len(r.staticRelays) > 0 { + return func(c *config) error { + if len(c.staticRelays) > 0 { return errors.New("can't set static relays, static relays already configured") } - r.staticRelays = static + c.staticRelays = static return nil } } diff --git a/p2p/host/autorelay/relay_finder.go b/p2p/host/autorelay/relay_finder.go index 4bb5c20c70..fa42678632 100644 --- a/p2p/host/autorelay/relay_finder.go +++ b/p2p/host/autorelay/relay_finder.go @@ -95,13 +95,11 @@ func newRelayFinder(host *basic.BasicHost, peerChan <-chan peer.AddrInfo, conf * } func (rf *relayFinder) background(ctx context.Context) { - if len(rf.conf.staticRelays) == 0 { - rf.refCount.Add(1) - go func() { - defer rf.refCount.Done() - rf.findNodes(ctx) - }() - } + rf.refCount.Add(1) + go func() { + defer rf.refCount.Done() + rf.findNodes(ctx) + }() subConnectedness, err := rf.host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged)) if err != nil { @@ -138,10 +136,8 @@ func (rf *relayFinder) background(ctx context.Context) { } rf.relayMx.Unlock() case <-rf.candidateFound: - log.Debugf("candidate found") rf.handleNewCandidate(ctx) case <-bootDelayTimer.C: - log.Debugf("boot delay timer") rf.handleNewCandidate(ctx) case <-rf.relayUpdated: push = true @@ -163,7 +159,7 @@ func (rf *relayFinder) background(ctx context.Context) { } // findNodes accepts nodes from the channel and tests if they support relaying. -// It is run on both public and private nodes (but not when static relays are set). +// It is run on both public and private nodes. // It garbage collects old entries, so that nodes doesn't overflow. // This makes sure that as soon as we need to find relay candidates, we have them available. func (rf *relayFinder) findNodes(ctx context.Context) { @@ -189,6 +185,13 @@ func (rf *relayFinder) findNodes(ctx context.Context) { } } +func (rf *relayFinder) notifyNewCandidate() { + select { + case rf.candidateFound <- struct{}{}: + default: + } +} + // handleNewNode tests if a peer supports circuit v1 or v2. // This method is only run on private nodes. // If a peer does, it is added to the candidates map. @@ -220,13 +223,6 @@ func (rf *relayFinder) handleNewNode(ctx context.Context, pi peer.AddrInfo) { rf.notifyNewCandidate() } -func (rf *relayFinder) notifyNewCandidate() { - select { - case rf.candidateFound <- struct{}{}: - default: - } -} - // tryNode checks if a peer actually supports either circuit v1 or circuit v2. // It does not modify any internal state. func (rf *relayFinder) tryNode(ctx context.Context, pi peer.AddrInfo) (supportsRelayV1 bool, err error) { @@ -293,10 +289,16 @@ func (rf *relayFinder) handleNewCandidate(ctx context.Context) { if len(rf.relays) == rf.conf.desiredRelays { return } - // During the startup phase, we don't want to connect to the first candidate that we find. - // Instead, we wait until we've found at least minCandidates, and then select the best of those. - // However, if that takes too long (longer than bootDelay), - if len(rf.relays) == 0 && len(rf.candidates) < rf.conf.minCandidates && time.Since(rf.bootTime) < rf.conf.bootDelay { + + if len(rf.conf.staticRelays) != 0 { + // make sure we read all static relays before continuing + if len(rf.peerChan) > 0 && len(rf.candidates) < rf.conf.minCandidates && time.Since(rf.bootTime) < rf.conf.bootDelay { + return + } + } else if len(rf.relays) == 0 && len(rf.candidates) < rf.conf.minCandidates && time.Since(rf.bootTime) < rf.conf.bootDelay { + // During the startup phase, we don't want to connect to the first candidate that we find. + // Instead, we wait until we've found at least minCandidates, and then select the best of those. + // However, if that takes too long (longer than bootDelay), we still go ahead. return } From 98f46f2e842a797be88b09c2ebb0929ee88b2968 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Tue, 5 Apr 2022 14:19:19 +0100 Subject: [PATCH 4/9] rename autorelay.WithNumCandidates to WithMaxCandidates --- p2p/host/autorelay/autorelay_test.go | 2 +- p2p/host/autorelay/options.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/p2p/host/autorelay/autorelay_test.go b/p2p/host/autorelay/autorelay_test.go index 6abd1f70ff..9f4c7f7a82 100644 --- a/p2p/host/autorelay/autorelay_test.go +++ b/p2p/host/autorelay/autorelay_test.go @@ -109,7 +109,7 @@ func TestSingleRelay(t *testing.T) { }() h := newPrivateNode(t, autorelay.WithPeerSource(peerChan), - autorelay.WithNumCandidates(1), + autorelay.WithMaxCandidates(1), autorelay.WithNumRelays(99999), autorelay.WithBootDelay(0), ) diff --git a/p2p/host/autorelay/options.go b/p2p/host/autorelay/options.go index 41ab1ef829..76022680db 100644 --- a/p2p/host/autorelay/options.go +++ b/p2p/host/autorelay/options.go @@ -13,7 +13,7 @@ type config struct { staticRelays []peer.AddrInfo // see WithMinCandidates minCandidates int - // see WithNumCandidates + // see WithMaxCandidates maxCandidates int // Delay until we obtain reservations with relays, if we have less than minCandidates candidates. // See WithBootDelay. @@ -88,8 +88,8 @@ func WithNumRelays(n int) Option { } } -// WithNumCandidates sets the number of relay candidates that we buffer. -func WithNumCandidates(n int) Option { +// WithMaxCandidates sets the number of relay candidates that we buffer. +func WithMaxCandidates(n int) Option { return func(c *config) error { c.maxCandidates = n return nil From 6bb54b57734bca291fcfcbfbaa982db44fa91f9a Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Tue, 5 Apr 2022 15:06:42 +0100 Subject: [PATCH 5/9] fix logic to prefer circuit v2 over circuit v1 --- p2p/host/autorelay/autorelay_test.go | 25 +++++++++++++++++++++++++ p2p/host/autorelay/options.go | 2 +- p2p/host/autorelay/relay_finder.go | 28 ++++++++++++++-------------- 3 files changed, 40 insertions(+), 15 deletions(-) diff --git a/p2p/host/autorelay/autorelay_test.go b/p2p/host/autorelay/autorelay_test.go index 9f4c7f7a82..147630400f 100644 --- a/p2p/host/autorelay/autorelay_test.go +++ b/p2p/host/autorelay/autorelay_test.go @@ -8,6 +8,7 @@ import ( "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/p2p/host/autorelay" + relayv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/relay" circuitv2_proto "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto" relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay" @@ -125,6 +126,30 @@ func TestSingleRelay(t *testing.T) { }, 200*time.Millisecond, 50*time.Millisecond) } +func TestPreferRelayV2(t *testing.T) { + r := newRelay(t) + defer r.Close() + // The relay supports both v1 and v2. The v1 stream handler should never be called, + // if we prefer v2 relays. + r.SetStreamHandler(relayv1.ProtoID, func(str network.Stream) { + str.Reset() + t.Fatal("used relay v1") + }) + peerChan := make(chan peer.AddrInfo, 1) + peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()} + h := newPrivateNode(t, + autorelay.WithPeerSource(peerChan), + autorelay.WithMaxCandidates(1), + autorelay.WithNumRelays(99999), + autorelay.WithBootDelay(0), + ) + defer h.Close() + + require.Eventually(t, func() bool { + return len(ma.FilterAddrs(h.Addrs(), isRelayAddr)) > 0 + }, 3*time.Second, 100*time.Millisecond) +} + func TestWaitForCandidates(t *testing.T) { peerChan := make(chan peer.AddrInfo) h := newPrivateNode(t, diff --git a/p2p/host/autorelay/options.go b/p2p/host/autorelay/options.go index 76022680db..01b7ac981d 100644 --- a/p2p/host/autorelay/options.go +++ b/p2p/host/autorelay/options.go @@ -126,7 +126,7 @@ func WithBackoff(d time.Duration) Option { } // WithMaxAttempts sets the number of times we attempt to obtain a reservation with a candidate. -// If we fail still fail to obtain a reservation, this candidate is dropped. +// If we still fail to obtain a reservation, this candidate is dropped. func WithMaxAttempts(n int) Option { return func(c *config) error { c.maxAttempts = n diff --git a/p2p/host/autorelay/relay_finder.go b/p2p/host/autorelay/relay_finder.go index fa42678632..98ec562f9e 100644 --- a/p2p/host/autorelay/relay_finder.go +++ b/p2p/host/autorelay/relay_finder.go @@ -43,10 +43,10 @@ const ( ) type candidate struct { - added time.Time - isRelayV1 bool - ai peer.AddrInfo - numAttempts int + added time.Time + supportsRelayV2 bool + ai peer.AddrInfo + numAttempts int } type candidateOnBackoff struct { @@ -206,7 +206,7 @@ func (rf *relayFinder) handleNewNode(ctx context.Context, pi peer.AddrInfo) { ctx, cancel := context.WithTimeout(ctx, 20*time.Second) defer cancel() - supportsV1, err := rf.tryNode(ctx, pi) + supportsV2, err := rf.tryNode(ctx, pi) if err != nil { log.Debugf("node %s not accepted as a candidate: %s", pi.ID, err) return @@ -216,8 +216,8 @@ func (rf *relayFinder) handleNewNode(ctx context.Context, pi peer.AddrInfo) { rf.candidateMx.Unlock() return } - log.Debugw("node supports relay protocol", "peer", pi.ID, "supports circuitv2", !supportsV1) - rf.candidates[pi.ID] = &candidate{ai: pi, isRelayV1: supportsV1} + log.Debugw("node supports relay protocol", "peer", pi.ID, "supports circuit v2", supportsV2) + rf.candidates[pi.ID] = &candidate{ai: pi, supportsRelayV2: supportsV2} rf.candidateMx.Unlock() rf.notifyNewCandidate() @@ -225,7 +225,7 @@ func (rf *relayFinder) handleNewNode(ctx context.Context, pi peer.AddrInfo) { // tryNode checks if a peer actually supports either circuit v1 or circuit v2. // It does not modify any internal state. -func (rf *relayFinder) tryNode(ctx context.Context, pi peer.AddrInfo) (supportsRelayV1 bool, err error) { +func (rf *relayFinder) tryNode(ctx context.Context, pi peer.AddrInfo) (supportsRelayV2 bool, err error) { if err := rf.host.Connect(ctx, pi); err != nil { return false, fmt.Errorf("error connecting to relay %s: %w", pi.ID, err) } @@ -276,7 +276,7 @@ func (rf *relayFinder) tryNode(ctx context.Context, pi peer.AddrInfo) (supportsR if !supportsV1 && !supportsV2 { return false, errors.New("doesn't speak circuit v1 or v2") } - return supportsV1, nil + return supportsV2, nil } func (rf *relayFinder) handleNewCandidate(ctx context.Context) { @@ -325,7 +325,7 @@ func (rf *relayFinder) handleNewCandidate(ctx context.Context) { continue } } - if !cand.isRelayV1 { + if cand.supportsRelayV2 { var err error rsvp, err = circuitv2.Reserve(ctx, rf.host, cand.ai) if err != nil { @@ -403,10 +403,10 @@ func (rf *relayFinder) checkForCandidatesOnBackoff(now time.Time) { } else { log.Debugw("moving backoff'ed candidate back", "id", cand.ai.ID) rf.candidates[cand.ai.ID] = &candidate{ - added: cand.added, - isRelayV1: cand.isRelayV1, - ai: cand.ai, - numAttempts: cand.numAttempts, + added: cand.added, + supportsRelayV2: cand.supportsRelayV2, + ai: cand.ai, + numAttempts: cand.numAttempts, } rf.notifyNewCandidate() } From ca6ded2a81c5bc0293807474666328c810f9c002 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Tue, 5 Apr 2022 15:13:43 +0100 Subject: [PATCH 6/9] fix context leak when autorelay option errors --- p2p/host/autorelay/autorelay.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/host/autorelay/autorelay.go b/p2p/host/autorelay/autorelay.go index 2ed66def77..a3c744f18f 100644 --- a/p2p/host/autorelay/autorelay.go +++ b/p2p/host/autorelay/autorelay.go @@ -41,13 +41,13 @@ func NewAutoRelay(bhost *basic.BasicHost, opts ...Option) (*AutoRelay, error) { addrsF: bhost.AddrsFactory, status: network.ReachabilityUnknown, } - r.ctx, r.ctxCancel = context.WithCancel(context.Background()) conf := defaultConfig for _, opt := range opts { if err := opt(&conf); err != nil { return nil, err } } + r.ctx, r.ctxCancel = context.WithCancel(context.Background()) r.peerChanOut = make(chan peer.AddrInfo, conf.maxCandidates) r.conf = &conf r.relayFinder = newRelayFinder(bhost, r.peerChanOut, &conf) From 220166ce780f837c1f146f2f3c6bec0c7348028d Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Tue, 5 Apr 2022 17:46:46 +0100 Subject: [PATCH 7/9] make sure that all static relay servers are used --- p2p/host/autorelay/options.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/p2p/host/autorelay/options.go b/p2p/host/autorelay/options.go index 01b7ac981d..91be40084a 100644 --- a/p2p/host/autorelay/options.go +++ b/p2p/host/autorelay/options.go @@ -23,7 +23,8 @@ type config struct { // If we fail to obtain a reservation more than maxAttempts, we stop trying. maxAttempts int // Number of relays we strive to obtain a reservation with. - desiredRelays int + desiredRelays int + setMinCandidates bool } var defaultConfig = config{ @@ -35,6 +36,8 @@ var defaultConfig = config{ desiredRelays: 2, } +var errStaticRelaysMinCandidates = errors.New("cannot use WithMinCandidates and WithStaticRelays") + // DefaultRelays are the known PL-operated v1 relays; will be decommissioned in 2022. var DefaultRelays = []string{ "/ip4/147.75.80.110/tcp/4001/p2p/QmbFgm5zan8P6eWWmeyfncR5feYEMPbht5b1FW1C37aQ7y", @@ -61,9 +64,13 @@ type Option func(*config) error func WithStaticRelays(static []peer.AddrInfo) Option { return func(c *config) error { + if c.setMinCandidates { + return errStaticRelaysMinCandidates + } if len(c.staticRelays) > 0 { return errors.New("can't set static relays, static relays already configured") } + c.minCandidates = len(static) c.staticRelays = static return nil } @@ -101,7 +108,11 @@ func WithMaxCandidates(n int) Option { // This is to make sure that we don't just randomly connect to the first candidate that we discover. func WithMinCandidates(n int) Option { return func(c *config) error { + if len(c.staticRelays) > 0 { + return errStaticRelaysMinCandidates + } c.minCandidates = n + c.setMinCandidates = true return nil } } From 44350ef427c68afe392bfd439e8093aab7275519 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Tue, 5 Apr 2022 18:22:58 +0100 Subject: [PATCH 8/9] use exponential backoff, with jitter --- p2p/host/autorelay/relay_finder.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/p2p/host/autorelay/relay_finder.go b/p2p/host/autorelay/relay_finder.go index 98ec562f9e..f803fcf7c9 100644 --- a/p2p/host/autorelay/relay_finder.go +++ b/p2p/host/autorelay/relay_finder.go @@ -383,9 +383,12 @@ func (rf *relayFinder) moveCandidateToBackoff(cand *candidate) { return } log.Debugw("moving candidate to backoff", "id", cand.ai.ID) + backoff := rf.conf.backoff * (1 << (cand.numAttempts - 1)) + // introduce a bit of jitter + backoff = (backoff * time.Duration(16+rand.Intn(8))) / time.Duration(20) rf.candidatesOnBackoff = append(rf.candidatesOnBackoff, &candidateOnBackoff{ candidate: *cand, - nextConnAttempt: time.Now().Add(rf.conf.backoff), + nextConnAttempt: time.Now().Add(backoff), }) } From 46fc1e50825b6235d6a8edae07d1d07deb5debfa Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sun, 10 Apr 2022 12:12:48 +0100 Subject: [PATCH 9/9] fix race condition when the AutoRelay peerChan fills up --- p2p/host/autorelay/autorelay.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/p2p/host/autorelay/autorelay.go b/p2p/host/autorelay/autorelay.go index a3c744f18f..bcf8432c8e 100644 --- a/p2p/host/autorelay/autorelay.go +++ b/p2p/host/autorelay/autorelay.go @@ -114,7 +114,10 @@ func (r *AutoRelay) background() { case r.peerChanOut <- pi: // if there's space in the channel, great default: // no space left in the channel. Drop the oldest entry. - <-r.peerChanOut + select { + case <-r.peerChanOut: + default: // The consumer might just have emptied the channel. Make sure we don't block in that case. + } r.peerChanOut <- pi } }