diff --git a/p2p/host/autonat/autonat.go b/p2p/host/autonat/autonat.go index 2d6c9786f9..951d12c06f 100644 --- a/p2p/host/autonat/autonat.go +++ b/p2p/host/autonat/autonat.go @@ -4,29 +4,17 @@ import ( "context" "errors" "math/rand" - "sync" + "sync/atomic" "time" "github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/peerstore" - ma "github.com/multiformats/go-multiaddr" -) -// NATStatus is the state of NAT as detected by the ambient service. -type NATStatus int - -const ( - // NAT status is unknown; this means that the ambient service has not been - // able to decide the presence of NAT in the most recent attempt to test - // dial through known autonat peers. initial state. - NATStatusUnknown NATStatus = iota - // NAT status is publicly dialable - NATStatusPublic - // NAT status is private network - NATStatusPrivate + "github.com/libp2p/go-eventbus" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr-net" ) var ( @@ -39,7 +27,7 @@ var ( // AutoNAT is the interface for ambient NAT autodiscovery type AutoNAT interface { // Status returns the current NAT status - Status() NATStatus + Status() network.Reachability // PublicAddr returns the public dial address when NAT status is public and an // error otherwise PublicAddr() (ma.Multiaddr, error) @@ -52,20 +40,27 @@ type AmbientAutoNAT struct { getAddrs GetAddrs - mx sync.Mutex - peers map[peer.ID][]ma.Multiaddr - status NATStatus - addr ma.Multiaddr + inboundConn chan network.Conn + observations chan autoNATResult + // status is an autoNATResult reflecting current status. + status atomic.Value // Reflects the confidence on of the NATStatus being private, as a single // dialback may fail for reasons unrelated to NAT. // If it is <3, then multiple autoNAT peers may be contacted for dialback // If only a single autoNAT peer is known, then the confidence increases // for each failure until it reaches 3. - confidence int + confidence int + lastInbound time.Time + lastProbe time.Time + + subAddrUpdated event.Subscription + + emitReachabilityChanged event.Emitter +} - emitUnknown event.Emitter - emitPublic event.Emitter - emitPrivate event.Emitter +type autoNATResult struct { + network.Reachability + address ma.Multiaddr } // NewAutoNAT creates a new ambient NAT autodiscovery instance attached to a host @@ -75,21 +70,22 @@ func NewAutoNAT(ctx context.Context, h host.Host, getAddrs GetAddrs) AutoNAT { getAddrs = h.Addrs } - emitUnknown, _ := h.EventBus().Emitter(new(event.EvtLocalRoutabilityUnknown)) - emitPublic, _ := h.EventBus().Emitter(new(event.EvtLocalRoutabilityPublic)) - emitPrivate, _ := h.EventBus().Emitter(new(event.EvtLocalRoutabilityPrivate)) + subAddrUpdated, _ := h.EventBus().Subscribe(new(event.EvtLocalAddressesUpdated)) + + emitReachabilityChanged, _ := h.EventBus().Emitter(new(event.EvtLocalReachabilityChanged), eventbus.Stateful) as := &AmbientAutoNAT{ - ctx: ctx, - host: h, - getAddrs: getAddrs, - peers: make(map[peer.ID][]ma.Multiaddr), - status: NATStatusUnknown, - - emitUnknown: emitUnknown, - emitPublic: emitPublic, - emitPrivate: emitPrivate, + ctx: ctx, + host: h, + getAddrs: getAddrs, + inboundConn: make(chan network.Conn, 5), + observations: make(chan autoNATResult, 1), + + subAddrUpdated: subAddrUpdated, + + emitReachabilityChanged: emitReachabilityChanged, } + as.status.Store(autoNATResult{network.ReachabilityUnknown, nil}) h.Network().Notify(as) go as.background() @@ -97,177 +93,222 @@ func NewAutoNAT(ctx context.Context, h host.Host, getAddrs GetAddrs) AutoNAT { return as } -func (as *AmbientAutoNAT) Status() NATStatus { - as.mx.Lock() - defer as.mx.Unlock() - return as.status +// Status returns the AutoNAT observed reachability status. +func (as *AmbientAutoNAT) Status() network.Reachability { + s := as.status.Load().(autoNATResult) + return s.Reachability } -func (as *AmbientAutoNAT) updateStatus(s NATStatus) { - as.status = s - switch s { - case NATStatusUnknown: - as.emitUnknown.Emit(event.EvtLocalRoutabilityUnknown{}) - case NATStatusPublic: - as.emitPublic.Emit(event.EvtLocalRoutabilityPublic{}) - case NATStatusPrivate: - as.emitPrivate.Emit(event.EvtLocalRoutabilityPrivate{}) - } +func (as *AmbientAutoNAT) emitStatus() { + status := as.status.Load().(autoNATResult) + as.emitReachabilityChanged.Emit(event.EvtLocalReachabilityChanged{Reachability: status.Reachability}) } +// PublicAddr returns the publicly connectable Multiaddr of this node if one is known. func (as *AmbientAutoNAT) PublicAddr() (ma.Multiaddr, error) { - as.mx.Lock() - defer as.mx.Unlock() - - if as.status != NATStatusPublic { + s := as.status.Load().(autoNATResult) + if s.Reachability != network.ReachabilityPublic { return nil, errors.New("NAT Status is not public") } - return as.addr, nil + return s.address, nil +} + +func ipInList(candidate ma.Multiaddr, list []ma.Multiaddr) bool { + candidateIP, _ := manet.ToIP(candidate) + for _, i := range list { + if ip, err := manet.ToIP(i); err == nil && ip.Equal(candidateIP) { + return true + } + } + return false } func (as *AmbientAutoNAT) background() { // wait a bit for the node to come online and establish some connections // before starting autodetection - select { - case <-time.After(AutoNATBootDelay): - case <-as.ctx.Done(): - return - } + delay := AutoNATBootDelay - for { - as.autodetect() + var lastAddrUpdated time.Time + addrUpdatedChan := as.subAddrUpdated.Out() + defer as.subAddrUpdated.Close() + defer as.emitReachabilityChanged.Close() - delay := AutoNATRefreshInterval - if as.status == NATStatusUnknown { - delay = AutoNATRetryInterval - } + timer := time.NewTimer(delay) + defer timer.Stop() + timerRunning := true + for { select { - case <-time.After(delay): + // new connection occured. + case conn := <-as.inboundConn: + localAddrs := as.host.Addrs() + ca := as.status.Load().(autoNATResult) + if ca.address != nil { + localAddrs = append(localAddrs, ca.address) + } + if !ipInList(conn.RemoteMultiaddr(), localAddrs) { + as.lastInbound = time.Now() + } + + case <-addrUpdatedChan: + if !lastAddrUpdated.Add(time.Second).After(time.Now()) { + lastAddrUpdated = time.Now() + if as.confidence > 1 { + as.confidence-- + } + } + + // probe finished. + case result, ok := <-as.observations: + if !ok { + return + } + as.recordObservation(result) + case <-timer.C: + timerRunning = false case <-as.ctx.Done(): return } - } -} - -func (as *AmbientAutoNAT) autodetect() { - peers := as.getPeers() - - if len(peers) == 0 { - log.Debugf("skipping NAT auto detection; no autonat peers") - return - } - - cli := NewAutoNATClient(as.host, as.getAddrs) - ctx, cancel := context.WithTimeout(as.ctx, AutoNATRequestTimeout) - defer cancel() - var result struct { - sync.Mutex - private int - public int - pubaddr ma.Multiaddr + // Drain the timer channel if it hasn't fired in preparation for Resetting it. + if timerRunning && !timer.Stop() { + <-timer.C + } + timer.Reset(as.scheduleProbe()) + timerRunning = true } +} - probe := 3 - as.confidence - if probe == 0 { - probe = 1 - } - if probe > len(peers) { - probe = len(peers) +// scheduleProbe calculates when the next probe should be scheduled for, +// and launches it if that time is now. +func (as *AmbientAutoNAT) scheduleProbe() time.Duration { + // Our baseline is a probe every 'AutoNATRefreshInterval' + // This is modulated by: + // * recent inbound connections make us willing to wait up to 2x longer between probes. + // * low confidence makes us speed up between probes. + fixedNow := time.Now() + currentStatus := as.status.Load().(autoNATResult) + + nextProbe := fixedNow + if !as.lastProbe.IsZero() { + untilNext := AutoNATRefreshInterval + if currentStatus.Reachability == network.ReachabilityUnknown { + untilNext = AutoNATRetryInterval + } else if as.confidence < 3 { + untilNext = AutoNATRetryInterval + } else if currentStatus.Reachability == network.ReachabilityPublic && as.lastInbound.After(as.lastProbe) { + untilNext *= 2 + } + nextProbe = as.lastProbe.Add(untilNext) } - - var wg sync.WaitGroup - - for _, pi := range peers[:probe] { - wg.Add(1) - go func(pi peer.AddrInfo) { - defer wg.Done() - - as.host.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL) - a, err := cli.DialBack(ctx, pi.ID) - - switch { - case err == nil: - log.Debugf("Dialback through %s successful; public address is %s", pi.ID.Pretty(), a.String()) - result.Lock() - result.public++ - result.pubaddr = a - result.Unlock() - - case IsDialError(err): - log.Debugf("Dialback through %s failed", pi.ID.Pretty()) - result.Lock() - result.private++ - result.Unlock() - - default: - log.Debugf("Dialback error through %s: %s", pi.ID.Pretty(), err) - } - }(pi) + if fixedNow.After(nextProbe) || fixedNow == nextProbe { + go as.probeNextPeer() + return AutoNATRetryInterval } + return nextProbe.Sub(fixedNow) +} - wg.Wait() - - as.mx.Lock() - if result.public > 0 { +// Update the current status based on an observed result. +func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) { + currentStatus := as.status.Load().(autoNATResult) + if observation.Reachability == network.ReachabilityPublic { log.Debugf("NAT status is public") - if as.status == NATStatusPrivate { + changed := false + if currentStatus.Reachability != network.ReachabilityPublic { // we are flipping our NATStatus, so confidence drops to 0 as.confidence = 0 + changed = true } else if as.confidence < 3 { as.confidence++ } - as.addr = result.pubaddr - as.updateStatus(NATStatusPublic) - } else if result.private > 0 { + if observation.address != nil { + if !changed && currentStatus.address != nil && !observation.address.Equal(currentStatus.address) { + as.confidence-- + } + if currentStatus.address == nil || !observation.address.Equal(currentStatus.address) { + changed = true + } + as.status.Store(observation) + } + if observation.address != nil && changed { + as.emitStatus() + } + } else if observation.Reachability == network.ReachabilityPrivate { log.Debugf("NAT status is private") - if as.status == NATStatusPublic { - // we are flipping our NATStatus, so confidence drops to 0 - as.confidence = 0 + if currentStatus.Reachability == network.ReachabilityPublic { + if as.confidence > 0 { + as.confidence-- + } else { + // we are flipping our NATStatus, so confidence drops to 0 + as.confidence = 0 + as.status.Store(observation) + as.emitStatus() + } } else if as.confidence < 3 { as.confidence++ + as.status.Store(observation) + if currentStatus.Reachability != network.ReachabilityPrivate { + as.emitStatus() + } } - as.addr = nil - as.updateStatus(NATStatusPrivate) } else if as.confidence > 0 { // don't just flip to unknown, reduce confidence first as.confidence-- } else { log.Debugf("NAT status is unknown") - as.addr = nil - as.updateStatus(NATStatusUnknown) + as.status.Store(autoNATResult{network.ReachabilityUnknown, nil}) + if currentStatus.Reachability != network.ReachabilityUnknown { + as.emitStatus() + } } - as.mx.Unlock() } -func (as *AmbientAutoNAT) getPeers() []peer.AddrInfo { - as.mx.Lock() - defer as.mx.Unlock() +func (as *AmbientAutoNAT) probe(pi *peer.AddrInfo) { + cli := NewAutoNATClient(as.host, as.getAddrs) + ctx, cancel := context.WithTimeout(as.ctx, AutoNATRequestTimeout) + defer cancel() + + a, err := cli.DialBack(ctx, pi.ID) + + switch { + case err == nil: + log.Debugf("Dialback through %s successful; public address is %s", pi.ID.Pretty(), a.String()) + as.observations <- autoNATResult{network.ReachabilityPublic, a} + case IsDialError(err): + log.Debugf("Dialback through %s failed", pi.ID.Pretty()) + as.observations <- autoNATResult{network.ReachabilityPrivate, nil} + default: + as.observations <- autoNATResult{network.ReachabilityUnknown, nil} + } +} - if len(as.peers) == 0 { - return nil +func (as *AmbientAutoNAT) probeNextPeer() { + peers := as.host.Network().Peers() + if len(peers) == 0 { + return } - var connected, others []peer.AddrInfo + addrs := make([]peer.AddrInfo, 0, len(peers)) - for p, addrs := range as.peers { - if as.host.Network().Connectedness(p) == network.Connected { - connected = append(connected, peer.AddrInfo{ID: p, Addrs: addrs}) - } else { - others = append(others, peer.AddrInfo{ID: p, Addrs: addrs}) + for _, p := range peers { + info := as.host.Peerstore().PeerInfo(p) + // Exclude peers which don't support the autonat protocol. + if proto, err := as.host.Peerstore().SupportsProtocols(p, AutoNATProto); len(proto) == 0 || err != nil { + continue } + addrs = append(addrs, info) } + // TODO: track and exclude recently probed peers. - shufflePeers(connected) - - if len(connected) < 3 { - shufflePeers(others) - return append(connected, others...) - } else { - return connected + if len(addrs) == 0 { + return } + + shufflePeers(addrs) + + as.lastProbe = time.Now() + as.probe(&addrs[0]) } func shufflePeers(peers []peer.AddrInfo) { diff --git a/p2p/host/autonat/autonat_test.go b/p2p/host/autonat/autonat_test.go index 74a429ca4f..ddf6d5cde2 100644 --- a/p2p/host/autonat/autonat_test.go +++ b/p2p/host/autonat/autonat_test.go @@ -18,7 +18,7 @@ import ( ) func init() { - AutoNATBootDelay = 1 * time.Second + AutoNATBootDelay = 100 * time.Millisecond AutoNATRefreshInterval = 1 * time.Second AutoNATRetryInterval = 1 * time.Second AutoNATIdentifyDelay = 100 * time.Millisecond @@ -73,10 +73,9 @@ func newDialResponseError(status pb.Message_ResponseStatus, text string) *pb.Mes func makeAutoNAT(ctx context.Context, t *testing.T, ash host.Host) (host.Host, AutoNAT) { h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx)) + h.Peerstore().AddAddrs(ash.ID(), ash.Addrs(), time.Minute) + h.Peerstore().AddProtocols(ash.ID(), AutoNATProto) a := NewAutoNAT(ctx, h, nil) - a.(*AmbientAutoNAT).mx.Lock() - a.(*AmbientAutoNAT).peers[ash.ID()] = ash.Addrs() - a.(*AmbientAutoNAT).mx.Unlock() return h, a } @@ -88,6 +87,19 @@ func connect(t *testing.T, a, b host.Host) { } } +func expectEvent(t *testing.T, s event.Subscription, expected network.Reachability) { + select { + case e := <-s.Out(): + ev, ok := e.(event.EvtLocalReachabilityChanged) + if !ok || ev.Reachability != expected { + t.Fatal("got wrong event type from the bus") + } + + case <-time.After(100 * time.Millisecond): + t.Fatal("failed to get the reachability event from the bus") + } +} + // tests func TestAutoNATPrivate(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) @@ -97,37 +109,57 @@ func TestAutoNATPrivate(t *testing.T) { hc, an := makeAutoNAT(ctx, t, hs) // subscribe to AutoNat events - s, err := hc.EventBus().Subscribe(&event.EvtLocalRoutabilityPrivate{}) + s, err := hc.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{}) if err != nil { - t.Fatalf("failed to subscribe to event EvtLocalRoutabilityPrivate, err=%s", err) + t.Fatalf("failed to subscribe to event EvtLocalReachabilityChanged, err=%s", err) } status := an.Status() - if status != NATStatusUnknown { + if status != network.ReachabilityUnknown { t.Fatalf("unexpected NAT status: %d", status) } connect(t, hs, hc) - time.Sleep(2 * time.Second) + time.Sleep(1 * time.Second) status = an.Status() - if status != NATStatusPrivate { + if status != network.ReachabilityPrivate { t.Fatalf("unexpected NAT status: %d", status) } - select { - case e := <-s.Out(): - _, ok := e.(event.EvtLocalRoutabilityPrivate) - if !ok { - t.Fatal("got wrong event type from the bus") - } + expectEvent(t, s, network.ReachabilityPrivate) +} + +func TestAutoNATPublic(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - case <-time.After(1 * time.Second): - t.Fatal("failed to get the EvtLocalRoutabilityPrivate event from the bus") + hs := makeAutoNATServicePublic(ctx, t) + hc, an := makeAutoNAT(ctx, t, hs) + + // subscribe to AutoNat events + s, err := hc.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{}) + if err != nil { + t.Fatalf("failed to subscribe to event EvtLocalReachabilityChanged, err=%s", err) } + + status := an.Status() + if status != network.ReachabilityUnknown { + t.Fatalf("unexpected NAT status: %d", status) + } + + connect(t, hs, hc) + time.Sleep(200 * time.Millisecond) + + status = an.Status() + if status != network.ReachabilityPublic { + t.Fatalf("unexpected NAT status: %d", status) + } + + expectEvent(t, s, network.ReachabilityPublic) } -func TestAutoNATPublic(t *testing.T) { +func TestAutoNATPublictoPrivate(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -135,32 +167,89 @@ func TestAutoNATPublic(t *testing.T) { hc, an := makeAutoNAT(ctx, t, hs) // subscribe to AutoNat events - s, err := hc.EventBus().Subscribe(&event.EvtLocalRoutabilityPublic{}) + s, err := hc.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{}) if err != nil { t.Fatalf("failed to subscribe to event EvtLocalRoutabilityPublic, err=%s", err) } status := an.Status() - if status != NATStatusUnknown { + if status != network.ReachabilityUnknown { t.Fatalf("unexpected NAT status: %d", status) } connect(t, hs, hc) + time.Sleep(200 * time.Millisecond) + + status = an.Status() + if status != network.ReachabilityPublic { + t.Fatalf("unexpected NAT status: %d", status) + } + + expectEvent(t, s, network.ReachabilityPublic) + + hs.SetStreamHandler(AutoNATProto, sayAutoNATPrivate) time.Sleep(2 * time.Second) status = an.Status() - if status != NATStatusPublic { + if status != network.ReachabilityPrivate { t.Fatalf("unexpected NAT status: %d", status) } +} + +func TestAutoNATObservationRecording(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hs := makeAutoNATServicePublic(ctx, t) + hc, ani := makeAutoNAT(ctx, t, hs) + an := ani.(*AmbientAutoNAT) + + s, err := hc.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{}) + if err != nil { + t.Fatalf("failed to subscribe to event EvtLocalRoutabilityPublic, err=%s", err) + } + + // pubic observation without address should be ignored. + an.recordObservation(autoNATResult{network.ReachabilityPublic, nil}) + if an.Status() != network.ReachabilityUnknown { + t.Fatalf("unexpected transition") + } select { - case e := <-s.Out(): - _, ok := e.(event.EvtLocalRoutabilityPublic) - if !ok { - t.Fatal("got wrong event type from the bus") - } + case _ = <-s.Out(): + t.Fatal("not expecting a public reachability event") + default: + //expected + } + + addr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/udp/1234") + an.recordObservation(autoNATResult{network.ReachabilityPublic, addr}) + if an.Status() != network.ReachabilityPublic { + t.Fatalf("failed to transition to public.") + } - case <-time.After(1 * time.Second): - t.Fatal("failed to get the EvtLocalRoutabilityPublic event from the bus") + expectEvent(t, s, network.ReachabilityPublic) + + // a single recording should have confidence still at 0, and transition to private quickly. + an.recordObservation(autoNATResult{network.ReachabilityPrivate, nil}) + if an.Status() != network.ReachabilityPrivate { + t.Fatalf("failed to transition to private.") + } + + expectEvent(t, s, network.ReachabilityPrivate) + + // stronger public confidence should be harder to undo. + an.recordObservation(autoNATResult{network.ReachabilityPublic, addr}) + an.recordObservation(autoNATResult{network.ReachabilityPublic, addr}) + if an.Status() != network.ReachabilityPublic { + t.Fatalf("failed to transition to public.") } + + expectEvent(t, s, network.ReachabilityPublic) + + an.recordObservation(autoNATResult{network.ReachabilityPrivate, nil}) + if an.Status() != network.ReachabilityPublic { + t.Fatalf("too-extreme private transition.") + } + } diff --git a/p2p/host/autonat/notify.go b/p2p/host/autonat/notify.go index 4ea6561603..ed8d8702dc 100644 --- a/p2p/host/autonat/notify.go +++ b/p2p/host/autonat/notify.go @@ -6,6 +6,7 @@ import ( "github.com/libp2p/go-libp2p-core/network" ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr-net" ) var _ network.Notifiee = (*AmbientAutoNAT)(nil) @@ -18,25 +19,13 @@ func (as *AmbientAutoNAT) OpenedStream(net network.Network, s network.Stream) {} func (as *AmbientAutoNAT) ClosedStream(net network.Network, s network.Stream) {} func (as *AmbientAutoNAT) Connected(net network.Network, c network.Conn) { - p := c.RemotePeer() - - go func() { - // add some delay for identify - time.Sleep(AutoNATIdentifyDelay) - - protos, err := as.host.Peerstore().SupportsProtocols(p, AutoNATProto) - if err != nil { - log.Debugf("error retrieving supported protocols for peer %s: %s", p, err) - return - } - - if len(protos) > 0 { - log.Infof("Discovered AutoNAT peer %s", p.Pretty()) - as.mx.Lock() - as.peers[p] = as.host.Peerstore().Addrs(p) - as.mx.Unlock() + if c.Stat().Direction == network.DirInbound && + manet.IsPublicAddr(c.RemoteMultiaddr()) { + select { + case as.inboundConn <- c: + default: } - }() + } } func (as *AmbientAutoNAT) Disconnected(net network.Network, c network.Conn) {}