Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use atomic.Int32 and atomic.Int64 #2096

Merged
merged 1 commit into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/test/crypto.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (
ci "github.com/libp2p/go-libp2p/core/crypto"
)

var globalSeed int64
var globalSeed atomic.Int64

func RandTestKeyPair(typ, bits int) (ci.PrivKey, ci.PubKey, error) {
// workaround for low time resolution
seed := atomic.AddInt64(&globalSeed, 1)
seed := globalSeed.Add(1)
return SeededTestKeyPair(typ, bits, seed)
}

Expand Down
18 changes: 9 additions & 9 deletions p2p/host/autorelay/autorelay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,17 +251,17 @@ func TestBackoff(t *testing.T) {
)
require.NoError(t, err)
defer r.Close()
var reservations int32
var reservations atomic.Int32
r.SetStreamHandler(protoIDv2, func(str network.Stream) {
atomic.AddInt32(&reservations, 1)
reservations.Add(1)
str.Reset()
})

var counter int32 // to be used atomically
var counter atomic.Int32
h := newPrivateNode(t,
func(context.Context, int) <-chan peer.AddrInfo {
// always return the same node, and make sure we don't try to connect to it too frequently
atomic.AddInt32(&counter, 1)
counter.Add(1)
peerChan := make(chan peer.AddrInfo, 1)
peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}
close(peerChan)
Expand All @@ -275,16 +275,16 @@ func TestBackoff(t *testing.T) {
)
defer h.Close()

require.Eventually(t, func() bool { return atomic.LoadInt32(&reservations) == 1 }, 3*time.Second, 20*time.Millisecond)
require.Eventually(t, func() bool { return reservations.Load() == 1 }, 3*time.Second, 20*time.Millisecond)
// make sure we don't add any relays yet
for i := 0; i < 2; i++ {
cl.Add(backoff / 3)
require.Equal(t, 1, int(atomic.LoadInt32(&reservations)))
require.Equal(t, 1, int(reservations.Load()))
}
cl.Add(backoff / 2)
require.Eventually(t, func() bool { return atomic.LoadInt32(&reservations) == 2 }, 3*time.Second, 20*time.Millisecond)
require.Less(t, int(atomic.LoadInt32(&counter)), 100) // just make sure we're not busy-looping
require.Equal(t, 2, int(atomic.LoadInt32(&reservations)))
require.Eventually(t, func() bool { return reservations.Load() == 2 }, 3*time.Second, 20*time.Millisecond)
require.Less(t, int(counter.Load()), 100) // just make sure we're not busy-looping
require.Equal(t, 2, int(reservations.Load()))
}

func TestStaticRelays(t *testing.T) {
Expand Down
18 changes: 9 additions & 9 deletions p2p/host/eventbus/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (e *emitter) Close() error {
if !e.closed.CompareAndSwap(false, true) {
return fmt.Errorf("closed an emitter more than once")
}
if atomic.AddInt32(&e.n.nEmitters, -1) == 0 {
if e.n.nEmitters.Add(-1) == 0 {
e.dropper(e.typ)
}
return nil
Expand Down Expand Up @@ -100,7 +100,7 @@ func (b *basicBus) tryDropNode(typ reflect.Type) {
}

n.lk.Lock()
if atomic.LoadInt32(&n.nEmitters) > 0 || len(n.sinks) > 0 {
if n.nEmitters.Load() > 0 || len(n.sinks) > 0 {
n.lk.Unlock()
b.lk.Unlock()
return // still in use
Expand Down Expand Up @@ -178,7 +178,7 @@ func (s *sub) Close() error {
}
}

tryDrop := len(n.sinks) == 0 && atomic.LoadInt32(&n.nEmitters) == 0
tryDrop := len(n.sinks) == 0 && n.nEmitters.Load() == 0

n.lk.Unlock()

Expand Down Expand Up @@ -294,7 +294,7 @@ func (b *basicBus) Emitter(evtType interface{}, opts ...event.EmitterOpt) (e eve
typ = typ.Elem()

b.withNode(typ, func(n *node) {
atomic.AddInt32(&n.nEmitters, 1)
n.nEmitters.Add(1)
n.keepLast = n.keepLast || settings.makeStateful
e = &emitter{n: n, typ: typ, dropper: b.tryDropNode, w: b.wildcard, metricsTracer: b.metricsTracer}
}, nil)
Expand All @@ -319,13 +319,13 @@ func (b *basicBus) GetAllEventTypes() []reflect.Type {

type wildcardNode struct {
sync.RWMutex
nSinks int32
nSinks atomic.Int32
sinks []*namedSink
metricsTracer MetricsTracer
}

func (n *wildcardNode) addSink(sink *namedSink) {
atomic.AddInt32(&n.nSinks, 1) // ok to do outside the lock
n.nSinks.Add(1) // ok to do outside the lock
n.Lock()
n.sinks = append(n.sinks, sink)
n.Unlock()
Expand All @@ -336,7 +336,7 @@ func (n *wildcardNode) addSink(sink *namedSink) {
}

func (n *wildcardNode) removeSink(ch chan interface{}) {
atomic.AddInt32(&n.nSinks, -1) // ok to do outside the lock
n.nSinks.Add(-1) // ok to do outside the lock
n.Lock()
for i := 0; i < len(n.sinks); i++ {
if n.sinks[i].ch == ch {
Expand All @@ -349,7 +349,7 @@ func (n *wildcardNode) removeSink(ch chan interface{}) {
}

func (n *wildcardNode) emit(evt interface{}) {
if atomic.LoadInt32(&n.nSinks) == 0 {
if n.nSinks.Load() == 0 {
return
}

Expand All @@ -372,7 +372,7 @@ type node struct {
typ reflect.Type

// emitter ref count
nEmitters int32
nEmitters atomic.Int32

keepLast bool
last interface{}
Expand Down
12 changes: 6 additions & 6 deletions p2p/host/eventbus/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func TestClosingRaces(t *testing.T) {
func TestSubMany(t *testing.T) {
bus := NewBus()

var r int32
var r atomic.Int32

n := getN()
var wait sync.WaitGroup
Expand All @@ -215,7 +215,7 @@ func TestSubMany(t *testing.T) {
defer sub.Close()

ready.Done()
atomic.AddInt32(&r, int32((<-sub.Out()).(EventB)))
r.Add(int32((<-sub.Out()).(EventB)))
wait.Done()
}()
}
Expand All @@ -231,7 +231,7 @@ func TestSubMany(t *testing.T) {
em.Emit(EventB(7))
wait.Wait()

if int(r) != 7*n {
if int(r.Load()) != 7*n {
t.Error("got wrong result")
}
}
Expand Down Expand Up @@ -488,7 +488,7 @@ func testMany(t testing.TB, subs, emits, msgs int, stateful bool) {

bus := NewBus()

var r int64
var r atomic.Int64

var wait sync.WaitGroup
var ready sync.WaitGroup
Expand All @@ -509,7 +509,7 @@ func testMany(t testing.TB, subs, emits, msgs int, stateful bool) {
if !ok {
panic("wat")
}
atomic.AddInt64(&r, int64(e.(EventB)))
r.Add(int64(e.(EventB)))
}
wait.Done()
}()
Expand Down Expand Up @@ -538,7 +538,7 @@ func testMany(t testing.TB, subs, emits, msgs int, stateful bool) {

wait.Wait()

if int(r) != 97*subs*emits*msgs {
if int(r.Load()) != 97*subs*emits*msgs {
t.Fatal("got wrong result")
}
}
Expand Down
4 changes: 2 additions & 2 deletions p2p/host/eventbus/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type subSettings struct {
name string
}

var subCnt int64
var subCnt atomic.Int64

var subSettingsDefault = subSettings{
buffer: 16,
Expand All @@ -34,7 +34,7 @@ func newSubSettings() subSettings {
}
settings.name = fmt.Sprintf("%s-L%d", file, line)
} else {
settings.name = fmt.Sprintf("subscriber-%d", atomic.AddInt64(&subCnt, 1))
settings.name = fmt.Sprintf("subscriber-%d", subCnt.Add(1))
}
return settings
}
Expand Down
14 changes: 7 additions & 7 deletions p2p/net/connmgr/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type BasicConnMgr struct {

// channel-based semaphore that enforces only a single trim is in progress
trimMutex sync.Mutex
connCount int32
connCount atomic.Int32
// to be accessed atomically. This is mimicking the implementation of a sync.Once.
// Take care of correct alignment when modifying this struct.
trimCount uint64
Expand Down Expand Up @@ -158,7 +158,7 @@ func NewConnManager(low, hi int, opts ...Option) (*BasicConnMgr, error) {
// We don't pay attention to the silence period or the grace period.
// We try to not kill protected connections, but if that turns out to be necessary, not connection is safe!
func (cm *BasicConnMgr) memoryEmergency() {
connCount := int(atomic.LoadInt32(&cm.connCount))
connCount := int(cm.connCount.Load())
target := connCount - cm.cfg.lowWater
if target < 0 {
log.Warnw("Low on memory, but we only have a few connections", "num", connCount, "low watermark", cm.cfg.lowWater)
Expand Down Expand Up @@ -346,7 +346,7 @@ func (cm *BasicConnMgr) background() {
for {
select {
case <-ticker.C:
if atomic.LoadInt32(&cm.connCount) < int32(cm.cfg.highWater) {
if cm.connCount.Load() < int32(cm.cfg.highWater) {
// Below high water, skip.
continue
}
Expand Down Expand Up @@ -456,7 +456,7 @@ func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
return nil
}

if int(atomic.LoadInt32(&cm.connCount)) <= cm.cfg.lowWater {
if int(cm.connCount.Load()) <= cm.cfg.lowWater {
log.Info("open connection count below limit")
return nil
}
Expand Down Expand Up @@ -632,7 +632,7 @@ func (cm *BasicConnMgr) GetInfo() CMInfo {
LowWater: cm.cfg.lowWater,
LastTrim: lastTrim,
GracePeriod: cm.cfg.gracePeriod,
ConnCount: int(atomic.LoadInt32(&cm.connCount)),
ConnCount: int(cm.connCount.Load()),
}
}

Expand Down Expand Up @@ -686,7 +686,7 @@ func (nn *cmNotifee) Connected(n network.Network, c network.Conn) {
}

pinfo.conns[c] = cm.clock.Now()
atomic.AddInt32(&cm.connCount, 1)
cm.connCount.Add(1)
}

// Disconnected is called by notifiers to inform that an existing connection has been closed or terminated.
Expand Down Expand Up @@ -715,7 +715,7 @@ func (nn *cmNotifee) Disconnected(n network.Network, c network.Conn) {
if len(cinf.conns) == 0 {
delete(s.peers, p)
}
atomic.AddInt32(&cm.connCount, -1)
cm.connCount.Add(-1)
}

// Listen is no-op in this implementation.
Expand Down
10 changes: 5 additions & 5 deletions p2p/net/connmgr/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func TestDoubleConnection(t *testing.T) {
not.Connected(nil, conn)
cm.TagPeer(conn.RemotePeer(), "foo", 10)
not.Connected(nil, conn)
if cm.connCount != 1 {
if cm.connCount.Load() != 1 {
t.Fatal("unexpected number of connections")
}
if cm.segments.get(conn.RemotePeer()).peers[conn.RemotePeer()].value != 10 {
Expand All @@ -386,23 +386,23 @@ func TestDisconnected(t *testing.T) {
cm.TagPeer(conn.RemotePeer(), "foo", 10)

not.Disconnected(nil, randConn(t, nil))
if cm.connCount != 1 {
if cm.connCount.Load() != 1 {
t.Fatal("unexpected number of connections")
}
if cm.segments.get(conn.RemotePeer()).peers[conn.RemotePeer()].value != 10 {
t.Fatal("unexpected peer value")
}

not.Disconnected(nil, &tconn{peer: conn.RemotePeer()})
if cm.connCount != 1 {
if cm.connCount.Load() != 1 {
t.Fatal("unexpected number of connections")
}
if cm.segments.get(conn.RemotePeer()).peers[conn.RemotePeer()].value != 10 {
t.Fatal("unexpected peer value")
}

not.Disconnected(nil, conn)
if cm.connCount != 0 {
if cm.connCount.Load() != 0 {
t.Fatal("unexpected number of connections")
}
if cm.segments.countPeers() != 0 {
Expand Down Expand Up @@ -495,7 +495,7 @@ func TestQuickBurstRespectsSilencePeriod(t *testing.T) {
if closed > 20 {
t.Fatalf("should have closed at most 20 connections, closed: %d", closed)
}
if total := closed + int(cm.connCount); total != 30 {
if total := closed + int(cm.connCount.Load()); total != 30 {
t.Fatalf("expected closed connections + open conn count to equal 30, value: %d", total)
}
}
Expand Down
4 changes: 2 additions & 2 deletions p2p/net/mock/mock_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
manet "github.com/multiformats/go-multiaddr/net"
)

var connCounter int64
var connCounter atomic.Int64

// conn represents one side's perspective of a
// live connection between two peers.
Expand Down Expand Up @@ -49,7 +49,7 @@ func newConn(ln, rn *peernet, l *link, dir network.Direction) *conn {
c.local = ln.peer
c.remote = rn.peer
c.stat.Direction = dir
c.id = atomic.AddInt64(&connCounter, 1)
c.id = connCounter.Add(1)

c.localAddr = ln.ps.Addrs(ln.peer)[0]
for _, a := range rn.ps.Addrs(rn.peer) {
Expand Down
4 changes: 2 additions & 2 deletions p2p/net/mock/mock_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/libp2p/go-libp2p/core/protocol"
)

var streamCounter int64
var streamCounter atomic.Int64

// stream implements network.Stream
type stream struct {
Expand Down Expand Up @@ -57,7 +57,7 @@ func newStream(w *io.PipeWriter, r *io.PipeReader, dir network.Direction) *strea
s := &stream{
read: r,
write: w,
id: atomic.AddInt64(&streamCounter, 1),
id: streamCounter.Add(1),
reset: make(chan struct{}, 1),
close: make(chan struct{}, 1),
closed: make(chan struct{}),
Expand Down
10 changes: 5 additions & 5 deletions p2p/net/upgrader/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,32 +261,32 @@ func TestAcceptQueueBacklogged(t *testing.T) {
defer ln.Close()

// setup AcceptQueueLength connections, but don't accept any of them
var counter int32 // to be used atomically
var counter atomic.Int32
doDial := func() {
conn, err := dial(t, u, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(err)
atomic.AddInt32(&counter, 1)
counter.Add(1)
t.Cleanup(func() { conn.Close() })
}

for i := 0; i < upgrader.AcceptQueueLength; i++ {
go doDial()
}

require.Eventually(func() bool { return int(atomic.LoadInt32(&counter)) == upgrader.AcceptQueueLength }, 2*time.Second, 50*time.Millisecond)
require.Eventually(func() bool { return int(counter.Load()) == upgrader.AcceptQueueLength }, 2*time.Second, 50*time.Millisecond)

// dial a new connection. This connection should not complete setup, since the queue is full
go doDial()

time.Sleep(100 * time.Millisecond)
require.Equal(int(atomic.LoadInt32(&counter)), upgrader.AcceptQueueLength)
require.Equal(int(counter.Load()), upgrader.AcceptQueueLength)

// accept a single connection. Now the new connection should be set up, and fill the queue again
conn, err := ln.Accept()
require.NoError(err)
require.NoError(conn.Close())

require.Eventually(func() bool { return int(atomic.LoadInt32(&counter)) == upgrader.AcceptQueueLength+1 }, 2*time.Second, 50*time.Millisecond)
require.Eventually(func() bool { return int(counter.Load()) == upgrader.AcceptQueueLength+1 }, 2*time.Second, 50*time.Millisecond)
}

func TestListenerConnectionGater(t *testing.T) {
Expand Down
Loading