diff --git a/p2p/host/relay/addrsplosion.go b/p2p/host/relay/addrsplosion.go new file mode 100644 index 0000000000..89965bca41 --- /dev/null +++ b/p2p/host/relay/addrsplosion.go @@ -0,0 +1,167 @@ +package relay + +import ( + "encoding/binary" + + circuit "github.com/libp2p/go-libp2p-circuit" + ma "github.com/multiformats/go-multiaddr" + dns "github.com/multiformats/go-multiaddr-dns" + manet "github.com/multiformats/go-multiaddr-net" +) + +// This function cleans up a relay's address set to remove private addresses and curtail +// addrsplosion. +func cleanupAddressSet(addrs []ma.Multiaddr) []ma.Multiaddr { + var public, private []ma.Multiaddr + + for _, a := range addrs { + if isRelayAddr(a) { + continue + } + + if manet.IsPublicAddr(a) || isDNSAddr(a) { + public = append(public, a) + continue + } + + // discard unroutable addrs + if manet.IsPrivateAddr(a) { + private = append(private, a) + } + } + + if !hasAddrsplosion(public) { + return public + } + + return sanitizeAddrsplodedSet(public, private) +} + +func isRelayAddr(a ma.Multiaddr) bool { + isRelay := false + + ma.ForEach(a, func(c ma.Component) bool { + switch c.Protocol().Code { + case circuit.P_CIRCUIT: + isRelay = true + return false + default: + return true + } + }) + + return isRelay +} + +func isDNSAddr(a ma.Multiaddr) bool { + if first, _ := ma.SplitFirst(a); first != nil { + switch first.Protocol().Code { + case dns.P_DNS4, dns.P_DNS6, dns.P_DNSADDR: + return true + } + } + return false +} + +// we have addrsplosion if for some protocol we advertise multiple ports on +// the same base address. +func hasAddrsplosion(addrs []ma.Multiaddr) bool { + aset := make(map[string]int) + + for _, a := range addrs { + key, port := addrKeyAndPort(a) + xport, ok := aset[key] + if ok && port != xport { + return true + } + aset[key] = port + } + + return false +} + +func addrKeyAndPort(a ma.Multiaddr) (string, int) { + var ( + key string + port int + ) + + ma.ForEach(a, func(c ma.Component) bool { + switch c.Protocol().Code { + case ma.P_TCP, ma.P_UDP: + port = int(binary.BigEndian.Uint16(c.RawValue())) + key += "/" + c.Protocol().Name + default: + val := c.Value() + if val == "" { + val = c.Protocol().Name + } + key += "/" + val + } + return true + }) + + return key, port +} + +// clean up addrsplosion +// the following heuristic is used: +// - for each base address/protocol combination, if there are multiple ports advertised then +// only accept the default port if present. +// - If the default port is not present, we check for non-standard ports by tracking +// private port bindings if present. +// - If there is no default or private port binding, then we can't infer the correct +// port and give up and return all addrs (for that base address) +func sanitizeAddrsplodedSet(public, private []ma.Multiaddr) []ma.Multiaddr { + type portAndAddr struct { + addr ma.Multiaddr + port int + } + + privports := make(map[int]struct{}) + pubaddrs := make(map[string][]portAndAddr) + + for _, a := range private { + _, port := addrKeyAndPort(a) + privports[port] = struct{}{} + } + + for _, a := range public { + key, port := addrKeyAndPort(a) + pubaddrs[key] = append(pubaddrs[key], portAndAddr{addr: a, port: port}) + } + + var result []ma.Multiaddr + for _, pas := range pubaddrs { + if len(pas) == 1 { + // it's not addrsploded + result = append(result, pas[0].addr) + continue + } + + haveAddr := false + for _, pa := range pas { + if _, ok := privports[pa.port]; ok { + // it matches a privately bound port, use it + result = append(result, pa.addr) + haveAddr = true + continue + } + + if pa.port == 4001 || pa.port == 4002 { + // it's a default port, use it + result = append(result, pa.addr) + haveAddr = true + } + } + + if !haveAddr { + // we weren't able to select a port; bite the bullet and use them all + for _, pa := range pas { + result = append(result, pa.addr) + } + } + } + + return result +} diff --git a/p2p/host/relay/addrsplosion_test.go b/p2p/host/relay/addrsplosion_test.go new file mode 100644 index 0000000000..cdb10def63 --- /dev/null +++ b/p2p/host/relay/addrsplosion_test.go @@ -0,0 +1,122 @@ +package relay + +import ( + "testing" + + ma "github.com/multiformats/go-multiaddr" + _ "github.com/multiformats/go-multiaddr-dns" +) + +func TestCleanupAddrs(t *testing.T) { + // test with no addrsplosion + addrs := makeAddrList( + "/ip4/127.0.0.1/tcp/4001", + "/ip4/127.0.0.1/udp/4002/quic", + "/ip4/1.2.3.4/tcp/4001", + "/ip4/1.2.3.4/udp/4002/quic", + "/dnsaddr/somedomain.com/tcp/4002/ws", + ) + clean := makeAddrList( + "/ip4/1.2.3.4/tcp/4001", + "/ip4/1.2.3.4/udp/4002/quic", + "/dnsaddr/somedomain.com/tcp/4002/ws", + ) + + r := cleanupAddressSet(addrs) + if !sameAddrs(clean, r) { + t.Fatal("cleaned up set doesn't match expected") + } + + // test with default port addrspolosion + addrs = makeAddrList( + "/ip4/127.0.0.1/tcp/4001", + "/ip4/1.2.3.4/tcp/4001", + "/ip4/1.2.3.4/tcp/33333", + "/ip4/1.2.3.4/tcp/33334", + "/ip4/1.2.3.4/tcp/33335", + "/ip4/1.2.3.4/udp/4002/quic", + ) + clean = makeAddrList( + "/ip4/1.2.3.4/tcp/4001", + "/ip4/1.2.3.4/udp/4002/quic", + ) + r = cleanupAddressSet(addrs) + if !sameAddrs(clean, r) { + t.Fatal("cleaned up set doesn't match expected") + } + + // test with default port addrsplosion but no private addrs + addrs = makeAddrList( + "/ip4/1.2.3.4/tcp/4001", + "/ip4/1.2.3.4/tcp/33333", + "/ip4/1.2.3.4/tcp/33334", + "/ip4/1.2.3.4/tcp/33335", + "/ip4/1.2.3.4/udp/4002/quic", + ) + clean = makeAddrList( + "/ip4/1.2.3.4/tcp/4001", + "/ip4/1.2.3.4/udp/4002/quic", + ) + r = cleanupAddressSet(addrs) + if !sameAddrs(clean, r) { + t.Fatal("cleaned up set doesn't match expected") + } + + // test with non-standard port addrsplosion + addrs = makeAddrList( + "/ip4/127.0.0.1/tcp/12345", + "/ip4/1.2.3.4/tcp/12345", + "/ip4/1.2.3.4/tcp/33333", + "/ip4/1.2.3.4/tcp/33334", + "/ip4/1.2.3.4/tcp/33335", + ) + clean = makeAddrList( + "/ip4/1.2.3.4/tcp/12345", + ) + r = cleanupAddressSet(addrs) + if !sameAddrs(clean, r) { + t.Fatal("cleaned up set doesn't match expected") + } + + // test with a squeaky clean address set + addrs = makeAddrList( + "/ip4/1.2.3.4/tcp/4001", + "/ip4/1.2.3.4/udp/4001/quic", + ) + clean = addrs + r = cleanupAddressSet(addrs) + if !sameAddrs(clean, r) { + t.Fatal("cleaned up set doesn't match expected") + } +} + +func makeAddrList(strs ...string) []ma.Multiaddr { + result := make([]ma.Multiaddr, 0, len(strs)) + for _, s := range strs { + a := ma.StringCast(s) + result = append(result, a) + } + return result +} + +func sameAddrs(as, bs []ma.Multiaddr) bool { + if len(as) != len(bs) { + return false + } + + for _, a := range as { + if !findAddr(a, bs) { + return false + } + } + return true +} + +func findAddr(a ma.Multiaddr, bs []ma.Multiaddr) bool { + for _, b := range bs { + if a.Equal(b) { + return true + } + } + return false +} diff --git a/p2p/host/relay/autorelay.go b/p2p/host/relay/autorelay.go index fbbbeef565..0c8b681256 100644 --- a/p2p/host/relay/autorelay.go +++ b/p2p/host/relay/autorelay.go @@ -41,8 +41,8 @@ type AutoRelay struct { disconnect chan struct{} mx sync.Mutex - relays map[peer.ID]pstore.PeerInfo - addrs []ma.Multiaddr + relays map[peer.ID]struct{} + status autonat.NATStatus } func NewAutoRelay(ctx context.Context, bhost *basic.BasicHost, discover discovery.Discoverer, router routing.PeerRouting) *AutoRelay { @@ -51,8 +51,9 @@ func NewAutoRelay(ctx context.Context, bhost *basic.BasicHost, discover discover discover: discover, router: router, addrsF: bhost.AddrsFactory, - relays: make(map[peer.ID]pstore.PeerInfo), + relays: make(map[peer.ID]struct{}), disconnect: make(chan struct{}, 1), + status: autonat.NATStatusUnknown, } ar.autonat = autonat.NewAutoNAT(ctx, bhost, ar.baseAddrs) bhost.AddrsFactory = ar.hostAddrs @@ -61,20 +62,14 @@ func NewAutoRelay(ctx context.Context, bhost *basic.BasicHost, discover discover return ar } -func (ar *AutoRelay) hostAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { - ar.mx.Lock() - defer ar.mx.Unlock() - if ar.addrs != nil && ar.autonat.Status() == autonat.NATStatusPrivate { - return ar.addrs - } else { - return ar.addrsF(addrs) - } -} - func (ar *AutoRelay) baseAddrs() []ma.Multiaddr { return ar.addrsF(ar.host.AllAddrs()) } +func (ar *AutoRelay) hostAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { + return ar.relayAddrs(ar.addrsF(addrs)) +} + func (ar *AutoRelay) background(ctx context.Context) { select { case <-time.After(autonat.AutoNATBootDelay + BootDelay): @@ -89,37 +84,37 @@ func (ar *AutoRelay) background(ctx context.Context) { wait := autonat.AutoNATRefreshInterval switch ar.autonat.Status() { case autonat.NATStatusUnknown: + ar.mx.Lock() + ar.status = autonat.NATStatusUnknown + ar.mx.Unlock() wait = autonat.AutoNATRetryInterval case autonat.NATStatusPublic: - // invalidate addrs ar.mx.Lock() - if ar.addrs != nil { - ar.addrs = nil + if ar.status != autonat.NATStatusPublic { push = true } + ar.status = autonat.NATStatusPublic ar.mx.Unlock() - // if we had previously announced relay addrs, push our public addrs - if push { - push = false - ar.host.PushIdentify() + case autonat.NATStatusPrivate: + update := ar.findRelays(ctx) + ar.mx.Lock() + if update || ar.status != autonat.NATStatusPrivate { + push = true } + ar.status = autonat.NATStatusPrivate + ar.mx.Unlock() + } - case autonat.NATStatusPrivate: - push = false // clear, findRelays pushes as needed - ar.findRelays(ctx) + if push { + push = false + ar.host.PushIdentify() } select { case <-ar.disconnect: - // invalidate addrs - ar.mx.Lock() - if ar.addrs != nil { - ar.addrs = nil - push = true - } - ar.mx.Unlock() + push = true case <-time.After(wait): case <-ctx.Done(): return @@ -127,30 +122,46 @@ func (ar *AutoRelay) background(ctx context.Context) { } } -func (ar *AutoRelay) findRelays(ctx context.Context) { +func (ar *AutoRelay) findRelays(ctx context.Context) bool { + retry := 0 + +again: ar.mx.Lock() - if len(ar.relays) >= DesiredRelays { - ar.mx.Unlock() - return - } - need := DesiredRelays - len(ar.relays) + haveRelays := len(ar.relays) ar.mx.Unlock() - - limit := 50 - if need > limit/2 { - limit = 2 * need + if haveRelays >= DesiredRelays { + return false } + need := DesiredRelays - haveRelays + + limit := 1000 dctx, cancel := context.WithTimeout(ctx, 30*time.Second) pis, err := discovery.FindPeers(dctx, ar.discover, RelayRendezvous, limit) cancel() if err != nil { log.Debugf("error discovering relays: %s", err.Error()) - return + + if haveRelays == 0 { + retry++ + if retry > 5 { + log.Debug("no relays connected; giving up") + return false + } + + log.Debug("no relays connected; retrying in 30s") + select { + case <-time.After(30 * time.Second): + goto again + case <-ctx.Done(): + return false + } + } } - pis = ar.selectRelays(pis) + log.Debugf("discovered %d relays", len(pis)) + pis = ar.selectRelays(ctx, pis) update := 0 for _, pi := range pis { @@ -181,7 +192,8 @@ func (ar *AutoRelay) findRelays(ctx context.Context) { log.Debugf("connected to relay %s", pi.ID) ar.mx.Lock() - ar.relays[pi.ID] = pi + ar.relays[pi.ID] = struct{}{} + haveRelays++ ar.mx.Unlock() // tag the connection as very important @@ -194,63 +206,80 @@ func (ar *AutoRelay) findRelays(ctx context.Context) { } } - if update > 0 || ar.addrs == nil { - ar.updateAddrs() + if haveRelays == 0 { + // we failed to find any relays and we are not connected to any! + // wait a little and try again, the discovery query might have returned only dead peers + retry++ + if retry > 5 { + log.Debug("no relays connected; giving up") + return false + } + + log.Debug("no relays connected; retrying in 30s") + select { + case <-time.After(30 * time.Second): + goto again + case <-ctx.Done(): + return false + } } + + return update > 0 } -func (ar *AutoRelay) selectRelays(pis []pstore.PeerInfo) []pstore.PeerInfo { +func (ar *AutoRelay) selectRelays(ctx context.Context, pis []pstore.PeerInfo) []pstore.PeerInfo { // TODO better relay selection strategy; this just selects random relays // but we should probably use ping latency as the selection metric + shuffleRelays(pis) return pis } -func (ar *AutoRelay) updateAddrs() { - ar.doUpdateAddrs() - ar.host.PushIdentify() -} - -// This function updates our NATed advertised addrs (ar.addrs) -// The public addrs are rewritten so that they only retain the public IP part; they -// become undialable but are useful as a hint to the dialer to determine whether or not -// to dial private addrs. -// The non-public addrs are included verbatim so that peers behind the same NAT/firewall -// can still dial us directly. -// On top of those, we add the relay-specific addrs for the relays to which we are -// connected. For each non-private relay addr, we encapsulate the p2p-circuit addr -// through which we can be dialed. -func (ar *AutoRelay) doUpdateAddrs() { +// This function is computes the NATed relay addrs when our status is private: +// - The public addrs are removed from the address set. +// - The non-public addrs are included verbatim so that peers behind the same NAT/firewall +// can still dial us directly. +// - On top of those, we add the relay-specific addrs for the relays to which we are +// connected. For each non-private relay addr, we encapsulate the p2p-circuit addr +// through which we can be dialed. +func (ar *AutoRelay) relayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { ar.mx.Lock() - defer ar.mx.Unlock() + if ar.status != autonat.NATStatusPrivate { + ar.mx.Unlock() + return addrs + } - addrs := ar.baseAddrs() - raddrs := make([]ma.Multiaddr, 0, len(addrs)+len(ar.relays)) + relays := make([]peer.ID, 0, len(ar.relays)) + for p := range ar.relays { + relays = append(relays, p) + } + ar.mx.Unlock() + + raddrs := make([]ma.Multiaddr, 0, 4*len(relays)+2) - // remove our public addresses from the list + // only keep private addrs from the original addr set for _, addr := range addrs { - if manet.IsPublicAddr(addr) { - continue + if manet.IsPrivateAddr(addr) { + raddrs = append(raddrs, addr) } - raddrs = append(raddrs, addr) } // add relay specific addrs to the list - for _, pi := range ar.relays { - circuit, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s/p2p-circuit", pi.ID.Pretty())) + for _, p := range relays { + addrs := cleanupAddressSet(ar.host.Peerstore().Addrs(p)) + + circuit, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s/p2p-circuit", p.Pretty())) if err != nil { panic(err) } - for _, addr := range pi.Addrs { - if !manet.IsPrivateAddr(addr) { - pub := addr.Encapsulate(circuit) - raddrs = append(raddrs, pub) - } + for _, addr := range addrs { + pub := addr.Encapsulate(circuit) + raddrs = append(raddrs, pub) } } - ar.addrs = raddrs + return raddrs } func shuffleRelays(pis []pstore.PeerInfo) { @@ -260,6 +289,7 @@ func shuffleRelays(pis []pstore.PeerInfo) { } } +// Notifee func (ar *AutoRelay) Listen(inet.Network, ma.Multiaddr) {} func (ar *AutoRelay) ListenClose(inet.Network, ma.Multiaddr) {} func (ar *AutoRelay) Connected(inet.Network, inet.Conn) {} diff --git a/p2p/host/relay/autorelay_test.go b/p2p/host/relay/autorelay_test.go index 379abd0ff0..ff3d7601ab 100644 --- a/p2p/host/relay/autorelay_test.go +++ b/p2p/host/relay/autorelay_test.go @@ -26,17 +26,17 @@ import ( // test specific parameters func init() { - autonat.AutoNATIdentifyDelay = 500 * time.Millisecond - autonat.AutoNATBootDelay = 1 * time.Second + autonat.AutoNATIdentifyDelay = 1 * time.Second + autonat.AutoNATBootDelay = 2 * time.Second relay.BootDelay = 1 * time.Second - relay.AdvertiseBootDelay = 1 * time.Millisecond - manet.Private4 = []*net.IPNet{} + relay.AdvertiseBootDelay = 100 * time.Millisecond } // mock routing type mockRoutingTable struct { mx sync.Mutex providers map[string]map[peer.ID]pstore.PeerInfo + peers map[peer.ID]pstore.PeerInfo } type mockRouting struct { @@ -53,7 +53,13 @@ func newMockRouting(h host.Host, tab *mockRoutingTable) *mockRouting { } func (m *mockRouting) FindPeer(ctx context.Context, p peer.ID) (pstore.PeerInfo, error) { - return pstore.PeerInfo{}, routing.ErrNotFound + m.tab.mx.Lock() + defer m.tab.mx.Unlock() + pi, ok := m.tab.peers[p] + if !ok { + return pstore.PeerInfo{}, routing.ErrNotFound + } + return pi, nil } func (m *mockRouting) Provide(ctx context.Context, cid cid.Cid, bcast bool) error { @@ -66,7 +72,12 @@ func (m *mockRouting) Provide(ctx context.Context, cid cid.Cid, bcast bool) erro m.tab.providers[cid.String()] = pmap } - pmap[m.h.ID()] = pstore.PeerInfo{ID: m.h.ID(), Addrs: m.h.Addrs()} + pi := pstore.PeerInfo{ID: m.h.ID(), Addrs: m.h.Addrs()} + pmap[m.h.ID()] = pi + if m.tab.peers == nil { + m.tab.peers = make(map[peer.ID]pstore.PeerInfo) + } + m.tab.peers[m.h.ID()] = pi return nil } @@ -133,7 +144,9 @@ func connect(t *testing.T, a, b host.Host) { // and the actual test! func TestAutoRelay(t *testing.T) { - t.Skip("fails 99% of the time") + //t.Skip("fails 99% of the time") + + manet.Private4 = []*net.IPNet{} ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -165,7 +178,7 @@ func TestAutoRelay(t *testing.T) { // connect to AutoNAT and let detection/discovery work its magic connect(t, h1, h3) - time.Sleep(3 * time.Second) + time.Sleep(5 * time.Second) // verify that we now advertise relay addrs (but not unspecific relay addrs) unspecificRelay, err := ma.NewMultiaddr("/p2p-circuit") diff --git a/p2p/host/relay/relay.go b/p2p/host/relay/relay.go index 0f38166984..2adea8fcc6 100644 --- a/p2p/host/relay/relay.go +++ b/p2p/host/relay/relay.go @@ -4,13 +4,13 @@ import ( "context" "time" - circuit "github.com/libp2p/go-libp2p-circuit" discovery "github.com/libp2p/go-libp2p-discovery" ma "github.com/multiformats/go-multiaddr" ) var ( - AdvertiseBootDelay = 30 * time.Second + // this is purposefully long to require some node stability before advertising as a relay + AdvertiseBootDelay = 15 * time.Minute ) // Advertise advertises this node as a libp2p relay. @@ -28,8 +28,7 @@ func Advertise(ctx context.Context, advertise discovery.Advertiser) { func Filter(addrs []ma.Multiaddr) []ma.Multiaddr { raddrs := make([]ma.Multiaddr, 0, len(addrs)) for _, addr := range addrs { - _, err := addr.ValueForProtocol(circuit.P_CIRCUIT) - if err == nil { + if isRelayAddr(addr) { continue } raddrs = append(raddrs, addr)