Skip to content

Commit

Permalink
use atomic.Int32 and atomic.Int64 (#2096)
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt authored Feb 14, 2023
1 parent 3d9cc01 commit 0a40e7c
Show file tree
Hide file tree
Showing 14 changed files with 69 additions and 69 deletions.
4 changes: 2 additions & 2 deletions core/test/crypto.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (
ci "github.com/libp2p/go-libp2p/core/crypto"
)

var globalSeed int64
var globalSeed atomic.Int64

func RandTestKeyPair(typ, bits int) (ci.PrivKey, ci.PubKey, error) {
// workaround for low time resolution
seed := atomic.AddInt64(&globalSeed, 1)
seed := globalSeed.Add(1)
return SeededTestKeyPair(typ, bits, seed)
}

Expand Down
18 changes: 9 additions & 9 deletions p2p/host/autorelay/autorelay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,17 +251,17 @@ func TestBackoff(t *testing.T) {
)
require.NoError(t, err)
defer r.Close()
var reservations int32
var reservations atomic.Int32
r.SetStreamHandler(protoIDv2, func(str network.Stream) {
atomic.AddInt32(&reservations, 1)
reservations.Add(1)
str.Reset()
})

var counter int32 // to be used atomically
var counter atomic.Int32
h := newPrivateNode(t,
func(context.Context, int) <-chan peer.AddrInfo {
// always return the same node, and make sure we don't try to connect to it too frequently
atomic.AddInt32(&counter, 1)
counter.Add(1)
peerChan := make(chan peer.AddrInfo, 1)
peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}
close(peerChan)
Expand All @@ -275,16 +275,16 @@ func TestBackoff(t *testing.T) {
)
defer h.Close()

require.Eventually(t, func() bool { return atomic.LoadInt32(&reservations) == 1 }, 3*time.Second, 20*time.Millisecond)
require.Eventually(t, func() bool { return reservations.Load() == 1 }, 3*time.Second, 20*time.Millisecond)
// make sure we don't add any relays yet
for i := 0; i < 2; i++ {
cl.Add(backoff / 3)
require.Equal(t, 1, int(atomic.LoadInt32(&reservations)))
require.Equal(t, 1, int(reservations.Load()))
}
cl.Add(backoff / 2)
require.Eventually(t, func() bool { return atomic.LoadInt32(&reservations) == 2 }, 3*time.Second, 20*time.Millisecond)
require.Less(t, int(atomic.LoadInt32(&counter)), 100) // just make sure we're not busy-looping
require.Equal(t, 2, int(atomic.LoadInt32(&reservations)))
require.Eventually(t, func() bool { return reservations.Load() == 2 }, 3*time.Second, 20*time.Millisecond)
require.Less(t, int(counter.Load()), 100) // just make sure we're not busy-looping
require.Equal(t, 2, int(reservations.Load()))
}

func TestStaticRelays(t *testing.T) {
Expand Down
18 changes: 9 additions & 9 deletions p2p/host/eventbus/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (e *emitter) Close() error {
if !e.closed.CompareAndSwap(false, true) {
return fmt.Errorf("closed an emitter more than once")
}
if atomic.AddInt32(&e.n.nEmitters, -1) == 0 {
if e.n.nEmitters.Add(-1) == 0 {
e.dropper(e.typ)
}
return nil
Expand Down Expand Up @@ -100,7 +100,7 @@ func (b *basicBus) tryDropNode(typ reflect.Type) {
}

n.lk.Lock()
if atomic.LoadInt32(&n.nEmitters) > 0 || len(n.sinks) > 0 {
if n.nEmitters.Load() > 0 || len(n.sinks) > 0 {
n.lk.Unlock()
b.lk.Unlock()
return // still in use
Expand Down Expand Up @@ -178,7 +178,7 @@ func (s *sub) Close() error {
}
}

tryDrop := len(n.sinks) == 0 && atomic.LoadInt32(&n.nEmitters) == 0
tryDrop := len(n.sinks) == 0 && n.nEmitters.Load() == 0

n.lk.Unlock()

Expand Down Expand Up @@ -294,7 +294,7 @@ func (b *basicBus) Emitter(evtType interface{}, opts ...event.EmitterOpt) (e eve
typ = typ.Elem()

b.withNode(typ, func(n *node) {
atomic.AddInt32(&n.nEmitters, 1)
n.nEmitters.Add(1)
n.keepLast = n.keepLast || settings.makeStateful
e = &emitter{n: n, typ: typ, dropper: b.tryDropNode, w: b.wildcard, metricsTracer: b.metricsTracer}
}, nil)
Expand All @@ -319,13 +319,13 @@ func (b *basicBus) GetAllEventTypes() []reflect.Type {

type wildcardNode struct {
sync.RWMutex
nSinks int32
nSinks atomic.Int32
sinks []*namedSink
metricsTracer MetricsTracer
}

func (n *wildcardNode) addSink(sink *namedSink) {
atomic.AddInt32(&n.nSinks, 1) // ok to do outside the lock
n.nSinks.Add(1) // ok to do outside the lock
n.Lock()
n.sinks = append(n.sinks, sink)
n.Unlock()
Expand All @@ -336,7 +336,7 @@ func (n *wildcardNode) addSink(sink *namedSink) {
}

func (n *wildcardNode) removeSink(ch chan interface{}) {
atomic.AddInt32(&n.nSinks, -1) // ok to do outside the lock
n.nSinks.Add(-1) // ok to do outside the lock
n.Lock()
for i := 0; i < len(n.sinks); i++ {
if n.sinks[i].ch == ch {
Expand All @@ -349,7 +349,7 @@ func (n *wildcardNode) removeSink(ch chan interface{}) {
}

func (n *wildcardNode) emit(evt interface{}) {
if atomic.LoadInt32(&n.nSinks) == 0 {
if n.nSinks.Load() == 0 {
return
}

Expand All @@ -372,7 +372,7 @@ type node struct {
typ reflect.Type

// emitter ref count
nEmitters int32
nEmitters atomic.Int32

keepLast bool
last interface{}
Expand Down
12 changes: 6 additions & 6 deletions p2p/host/eventbus/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func TestClosingRaces(t *testing.T) {
func TestSubMany(t *testing.T) {
bus := NewBus()

var r int32
var r atomic.Int32

n := getN()
var wait sync.WaitGroup
Expand All @@ -215,7 +215,7 @@ func TestSubMany(t *testing.T) {
defer sub.Close()

ready.Done()
atomic.AddInt32(&r, int32((<-sub.Out()).(EventB)))
r.Add(int32((<-sub.Out()).(EventB)))
wait.Done()
}()
}
Expand All @@ -231,7 +231,7 @@ func TestSubMany(t *testing.T) {
em.Emit(EventB(7))
wait.Wait()

if int(r) != 7*n {
if int(r.Load()) != 7*n {
t.Error("got wrong result")
}
}
Expand Down Expand Up @@ -488,7 +488,7 @@ func testMany(t testing.TB, subs, emits, msgs int, stateful bool) {

bus := NewBus()

var r int64
var r atomic.Int64

var wait sync.WaitGroup
var ready sync.WaitGroup
Expand All @@ -509,7 +509,7 @@ func testMany(t testing.TB, subs, emits, msgs int, stateful bool) {
if !ok {
panic("wat")
}
atomic.AddInt64(&r, int64(e.(EventB)))
r.Add(int64(e.(EventB)))
}
wait.Done()
}()
Expand Down Expand Up @@ -538,7 +538,7 @@ func testMany(t testing.TB, subs, emits, msgs int, stateful bool) {

wait.Wait()

if int(r) != 97*subs*emits*msgs {
if int(r.Load()) != 97*subs*emits*msgs {
t.Fatal("got wrong result")
}
}
Expand Down
4 changes: 2 additions & 2 deletions p2p/host/eventbus/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type subSettings struct {
name string
}

var subCnt int64
var subCnt atomic.Int64

var subSettingsDefault = subSettings{
buffer: 16,
Expand All @@ -34,7 +34,7 @@ func newSubSettings() subSettings {
}
settings.name = fmt.Sprintf("%s-L%d", file, line)
} else {
settings.name = fmt.Sprintf("subscriber-%d", atomic.AddInt64(&subCnt, 1))
settings.name = fmt.Sprintf("subscriber-%d", subCnt.Add(1))
}
return settings
}
Expand Down
14 changes: 7 additions & 7 deletions p2p/net/connmgr/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type BasicConnMgr struct {

// channel-based semaphore that enforces only a single trim is in progress
trimMutex sync.Mutex
connCount int32
connCount atomic.Int32
// to be accessed atomically. This is mimicking the implementation of a sync.Once.
// Take care of correct alignment when modifying this struct.
trimCount uint64
Expand Down Expand Up @@ -158,7 +158,7 @@ func NewConnManager(low, hi int, opts ...Option) (*BasicConnMgr, error) {
// We don't pay attention to the silence period or the grace period.
// We try to not kill protected connections, but if that turns out to be necessary, not connection is safe!
func (cm *BasicConnMgr) memoryEmergency() {
connCount := int(atomic.LoadInt32(&cm.connCount))
connCount := int(cm.connCount.Load())
target := connCount - cm.cfg.lowWater
if target < 0 {
log.Warnw("Low on memory, but we only have a few connections", "num", connCount, "low watermark", cm.cfg.lowWater)
Expand Down Expand Up @@ -346,7 +346,7 @@ func (cm *BasicConnMgr) background() {
for {
select {
case <-ticker.C:
if atomic.LoadInt32(&cm.connCount) < int32(cm.cfg.highWater) {
if cm.connCount.Load() < int32(cm.cfg.highWater) {
// Below high water, skip.
continue
}
Expand Down Expand Up @@ -456,7 +456,7 @@ func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
return nil
}

if int(atomic.LoadInt32(&cm.connCount)) <= cm.cfg.lowWater {
if int(cm.connCount.Load()) <= cm.cfg.lowWater {
log.Info("open connection count below limit")
return nil
}
Expand Down Expand Up @@ -632,7 +632,7 @@ func (cm *BasicConnMgr) GetInfo() CMInfo {
LowWater: cm.cfg.lowWater,
LastTrim: lastTrim,
GracePeriod: cm.cfg.gracePeriod,
ConnCount: int(atomic.LoadInt32(&cm.connCount)),
ConnCount: int(cm.connCount.Load()),
}
}

Expand Down Expand Up @@ -686,7 +686,7 @@ func (nn *cmNotifee) Connected(n network.Network, c network.Conn) {
}

pinfo.conns[c] = cm.clock.Now()
atomic.AddInt32(&cm.connCount, 1)
cm.connCount.Add(1)
}

// Disconnected is called by notifiers to inform that an existing connection has been closed or terminated.
Expand Down Expand Up @@ -715,7 +715,7 @@ func (nn *cmNotifee) Disconnected(n network.Network, c network.Conn) {
if len(cinf.conns) == 0 {
delete(s.peers, p)
}
atomic.AddInt32(&cm.connCount, -1)
cm.connCount.Add(-1)
}

// Listen is no-op in this implementation.
Expand Down
10 changes: 5 additions & 5 deletions p2p/net/connmgr/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func TestDoubleConnection(t *testing.T) {
not.Connected(nil, conn)
cm.TagPeer(conn.RemotePeer(), "foo", 10)
not.Connected(nil, conn)
if cm.connCount != 1 {
if cm.connCount.Load() != 1 {
t.Fatal("unexpected number of connections")
}
if cm.segments.get(conn.RemotePeer()).peers[conn.RemotePeer()].value != 10 {
Expand All @@ -386,23 +386,23 @@ func TestDisconnected(t *testing.T) {
cm.TagPeer(conn.RemotePeer(), "foo", 10)

not.Disconnected(nil, randConn(t, nil))
if cm.connCount != 1 {
if cm.connCount.Load() != 1 {
t.Fatal("unexpected number of connections")
}
if cm.segments.get(conn.RemotePeer()).peers[conn.RemotePeer()].value != 10 {
t.Fatal("unexpected peer value")
}

not.Disconnected(nil, &tconn{peer: conn.RemotePeer()})
if cm.connCount != 1 {
if cm.connCount.Load() != 1 {
t.Fatal("unexpected number of connections")
}
if cm.segments.get(conn.RemotePeer()).peers[conn.RemotePeer()].value != 10 {
t.Fatal("unexpected peer value")
}

not.Disconnected(nil, conn)
if cm.connCount != 0 {
if cm.connCount.Load() != 0 {
t.Fatal("unexpected number of connections")
}
if cm.segments.countPeers() != 0 {
Expand Down Expand Up @@ -495,7 +495,7 @@ func TestQuickBurstRespectsSilencePeriod(t *testing.T) {
if closed > 20 {
t.Fatalf("should have closed at most 20 connections, closed: %d", closed)
}
if total := closed + int(cm.connCount); total != 30 {
if total := closed + int(cm.connCount.Load()); total != 30 {
t.Fatalf("expected closed connections + open conn count to equal 30, value: %d", total)
}
}
Expand Down
4 changes: 2 additions & 2 deletions p2p/net/mock/mock_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
manet "github.com/multiformats/go-multiaddr/net"
)

var connCounter int64
var connCounter atomic.Int64

// conn represents one side's perspective of a
// live connection between two peers.
Expand Down Expand Up @@ -49,7 +49,7 @@ func newConn(ln, rn *peernet, l *link, dir network.Direction) *conn {
c.local = ln.peer
c.remote = rn.peer
c.stat.Direction = dir
c.id = atomic.AddInt64(&connCounter, 1)
c.id = connCounter.Add(1)

c.localAddr = ln.ps.Addrs(ln.peer)[0]
for _, a := range rn.ps.Addrs(rn.peer) {
Expand Down
4 changes: 2 additions & 2 deletions p2p/net/mock/mock_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/libp2p/go-libp2p/core/protocol"
)

var streamCounter int64
var streamCounter atomic.Int64

// stream implements network.Stream
type stream struct {
Expand Down Expand Up @@ -57,7 +57,7 @@ func newStream(w *io.PipeWriter, r *io.PipeReader, dir network.Direction) *strea
s := &stream{
read: r,
write: w,
id: atomic.AddInt64(&streamCounter, 1),
id: streamCounter.Add(1),
reset: make(chan struct{}, 1),
close: make(chan struct{}, 1),
closed: make(chan struct{}),
Expand Down
10 changes: 5 additions & 5 deletions p2p/net/upgrader/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,32 +261,32 @@ func TestAcceptQueueBacklogged(t *testing.T) {
defer ln.Close()

// setup AcceptQueueLength connections, but don't accept any of them
var counter int32 // to be used atomically
var counter atomic.Int32
doDial := func() {
conn, err := dial(t, u, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(err)
atomic.AddInt32(&counter, 1)
counter.Add(1)
t.Cleanup(func() { conn.Close() })
}

for i := 0; i < upgrader.AcceptQueueLength; i++ {
go doDial()
}

require.Eventually(func() bool { return int(atomic.LoadInt32(&counter)) == upgrader.AcceptQueueLength }, 2*time.Second, 50*time.Millisecond)
require.Eventually(func() bool { return int(counter.Load()) == upgrader.AcceptQueueLength }, 2*time.Second, 50*time.Millisecond)

// dial a new connection. This connection should not complete setup, since the queue is full
go doDial()

time.Sleep(100 * time.Millisecond)
require.Equal(int(atomic.LoadInt32(&counter)), upgrader.AcceptQueueLength)
require.Equal(int(counter.Load()), upgrader.AcceptQueueLength)

// accept a single connection. Now the new connection should be set up, and fill the queue again
conn, err := ln.Accept()
require.NoError(err)
require.NoError(conn.Close())

require.Eventually(func() bool { return int(atomic.LoadInt32(&counter)) == upgrader.AcceptQueueLength+1 }, 2*time.Second, 50*time.Millisecond)
require.Eventually(func() bool { return int(counter.Load()) == upgrader.AcceptQueueLength+1 }, 2*time.Second, 50*time.Millisecond)
}

func TestListenerConnectionGater(t *testing.T) {
Expand Down
Loading

0 comments on commit 0a40e7c

Please sign in to comment.