diff --git a/p2p/host/autonat/autonat.go b/p2p/host/autonat/autonat.go index 7368cccfe4..eee5481aea 100644 --- a/p2p/host/autonat/autonat.go +++ b/p2p/host/autonat/autonat.go @@ -33,7 +33,7 @@ type AmbientAutoNAT struct { inboundConn chan network.Conn observations chan autoNATResult // status is an autoNATResult reflecting current status. - status atomic.Value + status atomic.Pointer[autoNATResult] // Reflects the confidence on of the NATStatus being private, as a single // dialback may fail for reasons unrelated to NAT. // If it is <3, then multiple autoNAT peers may be contacted for dialback @@ -117,7 +117,7 @@ func New(h host.Host, options ...Option) (AutoNAT, error) { service: service, recentProbes: make(map[peer.ID]time.Time), } - as.status.Store(autoNATResult{network.ReachabilityUnknown, nil}) + as.status.Store(&autoNATResult{network.ReachabilityUnknown, nil}) subscriber, err := as.host.EventBus().Subscribe( []any{new(event.EvtLocalAddressesUpdated), new(event.EvtPeerIdentificationCompleted)}, @@ -136,18 +136,18 @@ func New(h host.Host, options ...Option) (AutoNAT, error) { // Status returns the AutoNAT observed reachability status. func (as *AmbientAutoNAT) Status() network.Reachability { - s := as.status.Load().(autoNATResult) + s := as.status.Load() return s.Reachability } func (as *AmbientAutoNAT) emitStatus() { - status := as.status.Load().(autoNATResult) + status := as.status.Load() as.emitReachabilityChanged.Emit(event.EvtLocalReachabilityChanged{Reachability: status.Reachability}) } // PublicAddr returns the publicly connectable Multiaddr of this node if one is known. func (as *AmbientAutoNAT) PublicAddr() (ma.Multiaddr, error) { - s := as.status.Load().(autoNATResult) + s := as.status.Load() if s.Reachability != network.ReachabilityPublic { return nil, errors.New("NAT status is not public") } @@ -184,7 +184,7 @@ func (as *AmbientAutoNAT) background() { // new inbound connection. case conn := <-as.inboundConn: localAddrs := as.host.Addrs() - ca := as.status.Load().(autoNATResult) + ca := as.status.Load() if ca.address != nil { localAddrs = append(localAddrs, ca.address) } @@ -204,7 +204,7 @@ func (as *AmbientAutoNAT) background() { } case event.EvtPeerIdentificationCompleted: if s, err := as.host.Peerstore().SupportsProtocols(e.Peer, AutoNATProto); err == nil && len(s) > 0 { - currentStatus := as.status.Load().(autoNATResult) + currentStatus := as.status.Load() if currentStatus.Reachability == network.ReachabilityUnknown { as.tryProbe(e.Peer) } @@ -253,7 +253,7 @@ func (as *AmbientAutoNAT) scheduleProbe() time.Duration { // * recent inbound connections (implying continued connectivity) should decrease the retry when public // * recent inbound connections when not public mean we should try more actively to see if we're public. fixedNow := time.Now() - currentStatus := as.status.Load().(autoNATResult) + currentStatus := as.status.Load() nextProbe := fixedNow // Don't look for peers in the peer store more than once per second. @@ -285,7 +285,7 @@ func (as *AmbientAutoNAT) scheduleProbe() time.Duration { // Update the current status based on an observed result. func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) { - currentStatus := as.status.Load().(autoNATResult) + currentStatus := as.status.Load() if observation.Reachability == network.ReachabilityPublic { log.Debugf("NAT status is public") changed := false @@ -306,7 +306,7 @@ func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) { if currentStatus.address == nil || !observation.address.Equal(currentStatus.address) { changed = true } - as.status.Store(observation) + as.status.Store(&observation) } if observation.address != nil && changed { as.emitStatus() @@ -319,7 +319,7 @@ func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) { } else { // we are flipping our NATStatus, so confidence drops to 0 as.confidence = 0 - as.status.Store(observation) + as.status.Store(&observation) if as.service != nil { as.service.Disable() } @@ -327,7 +327,7 @@ func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) { } } else if as.confidence < 3 { as.confidence++ - as.status.Store(observation) + as.status.Store(&observation) if currentStatus.Reachability != network.ReachabilityPrivate { as.emitStatus() } @@ -337,7 +337,7 @@ func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) { as.confidence-- } else { log.Debugf("NAT status is unknown") - as.status.Store(autoNATResult{network.ReachabilityUnknown, nil}) + as.status.Store(&autoNATResult{network.ReachabilityUnknown, nil}) if currentStatus.Reachability != network.ReachabilityUnknown { if as.service != nil { as.service.Enable() diff --git a/p2p/net/connmgr/decay.go b/p2p/net/connmgr/decay.go index ca39de8693..cf5a76acbe 100644 --- a/p2p/net/connmgr/decay.go +++ b/p2p/net/connmgr/decay.go @@ -38,7 +38,7 @@ type decayer struct { knownTags map[string]*decayingTag // lastTick stores the last time the decayer ticked. Guarded by atomic. - lastTick atomic.Value + lastTick atomic.Pointer[time.Time] // bumpTagCh queues bump commands to be processed by the loop. bumpTagCh chan bumpCmd @@ -89,7 +89,8 @@ func NewDecayer(cfg *DecayerCfg, mgr *BasicConnMgr) (*decayer, error) { doneCh: make(chan struct{}), } - d.lastTick.Store(d.clock.Now()) + now := d.clock.Now() + d.lastTick.Store(&now) // kick things off. go d.process() @@ -116,7 +117,7 @@ func (d *decayer) RegisterDecayingTag(name string, interval time.Duration, decay "some precision may be lost", name, interval, d.cfg.Resolution) } - lastTick := d.lastTick.Load().(time.Time) + lastTick := d.lastTick.Load() tag := &decayingTag{ trkr: d, name: name, @@ -163,7 +164,8 @@ func (d *decayer) process() { for { select { case now = <-ticker.C: - d.lastTick.Store(now) + nn := now + d.lastTick.Store(&nn) d.tagsMu.Lock() for _, tag := range d.knownTags { diff --git a/p2p/net/mock/mock_stream.go b/p2p/net/mock/mock_stream.go index a78329d306..f6cb842e75 100644 --- a/p2p/net/mock/mock_stream.go +++ b/p2p/net/mock/mock_stream.go @@ -31,7 +31,7 @@ type stream struct { writeErr error - protocol atomic.Value + protocol atomic.Pointer[protocol.ID] stat network.Stats } @@ -92,9 +92,11 @@ func (s *stream) ID() string { } func (s *stream) Protocol() protocol.ID { - // Ignore type error. It means that the protocol is unset. - p, _ := s.protocol.Load().(protocol.ID) - return p + p := s.protocol.Load() + if p == nil { + return "" + } + return *p } func (s *stream) Stat() network.Stats { @@ -102,7 +104,7 @@ func (s *stream) Stat() network.Stats { } func (s *stream) SetProtocol(proto protocol.ID) error { - s.protocol.Store(proto) + s.protocol.Store(&proto) return nil } diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 18d5183f52..327f5d6e10 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -146,7 +146,7 @@ type Swarm struct { maResolver *madns.Resolver // stream handlers - streamh atomic.Value + streamh atomic.Pointer[network.StreamHandler] // dialing helpers dsync *dialSync @@ -347,13 +347,16 @@ func (s *Swarm) Peerstore() peerstore.Peerstore { // SetStreamHandler assigns the handler for new streams. func (s *Swarm) SetStreamHandler(handler network.StreamHandler) { - s.streamh.Store(handler) + s.streamh.Store(&handler) } // StreamHandler gets the handler for new streams. func (s *Swarm) StreamHandler() network.StreamHandler { - handler, _ := s.streamh.Load().(network.StreamHandler) - return handler + handler := s.streamh.Load() + if handler == nil { + return nil + } + return *handler } // NewStream creates a new stream on any available connection to peer, dialing diff --git a/p2p/net/swarm/swarm_stream.go b/p2p/net/swarm/swarm_stream.go index 7a5bb27503..d372bcd8e4 100644 --- a/p2p/net/swarm/swarm_stream.go +++ b/p2p/net/swarm/swarm_stream.go @@ -24,7 +24,7 @@ type Stream struct { closeOnce sync.Once - protocol atomic.Value + protocol atomic.Pointer[protocol.ID] stat network.Stats } @@ -108,9 +108,11 @@ func (s *Stream) remove() { // Protocol returns the protocol negotiated on this stream (if set). func (s *Stream) Protocol() protocol.ID { - // Ignore type error. It means that the protocol is unset. - p, _ := s.protocol.Load().(protocol.ID) - return p + p := s.protocol.Load() + if p == nil { + return "" + } + return *p } // SetProtocol sets the protocol for this stream. @@ -123,7 +125,7 @@ func (s *Stream) SetProtocol(p protocol.ID) error { return err } - s.protocol.Store(p) + s.protocol.Store(&p) return nil }