Skip to content

Commit

Permalink
intra-cluster notifications: reduce locking, mem allocations
Browse files Browse the repository at this point in the history
* micro-optimize

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Sep 6, 2024
1 parent ace4bce commit b7965b7
Showing 1 changed file with 66 additions and 63 deletions.
129 changes: 66 additions & 63 deletions ais/prxnotif.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/NVIDIA/aistore/api/apc"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/atomic"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/cmn/mono"
Expand All @@ -34,8 +35,9 @@ const notifsName = "p-notifs"

type (
listeners struct {
m map[string]nl.Listener // [UUID => NotifListener]
sync.RWMutex
m map[string]nl.Listener // [UUID => NotifListener]
l atomic.Int32 // current len(m)
mtx sync.RWMutex
}

notifs struct {
Expand All @@ -47,6 +49,8 @@ type (
removed []nl.Listener // reusable slice of `nl` to remove from `nls`
finished []nl.Listener // reusable slice of `nl` to add to `fin`

tempnl []nl.Listener

smapVer int64
mu sync.Mutex
}
Expand Down Expand Up @@ -91,6 +95,8 @@ func (n *notifs) init(p *proxy) {
n.removed = make([]nl.Listener, 16)
n.finished = make([]nl.Listener, 16)

n.tempnl = make([]nl.Listener, 0, 16)

hk.Reg(notifsName+hk.NameSuffix, n.housekeep, hk.PruneActiveIval)
n.p.Sowner().Listeners().Reg(n)
}
Expand Down Expand Up @@ -267,13 +273,8 @@ func (n *notifs) findAll(flt nlFilter) (nls []nl.Listener) {
return
}

func (n *notifs) size() (size int) {
n.nls.RLock()
n.fin.RLock()
size = n.nls.len() + n.fin.len()
n.fin.RUnlock()
n.nls.RUnlock()
return
func (n *notifs) size() int32 {
return n.nls.l.Load() + n.fin.l.Load()
}

// PRECONDITION: `nl` should be under lock.
Expand Down Expand Up @@ -350,7 +351,7 @@ func abortReq(nl nl.Listener) cmn.HreqArgs {

func (n *notifs) housekeep() time.Duration {
now := time.Now().UnixNano()
n.fin.Lock()
n.fin.mtx.Lock()
for _, nl := range n.fin.m {
timeout := hk.DelOldIval
if nl.Kind() == apc.ActList {
Expand All @@ -360,23 +361,24 @@ func (n *notifs) housekeep() time.Duration {
n.fin.del(nl, true /*locked*/)
}
}
n.fin.Unlock()
n.fin.mtx.Unlock()

n.nls.RLock() // TODO: atomic instead
if n.nls.len() == 0 {
n.nls.RUnlock()
if n.nls.l.Load() == 0 {
return hk.PruneActiveIval
}
tempn := make(map[string]nl.Listener, n.nls.len())
for uuid, nl := range n.nls.m {
tempn[uuid] = nl

n.nls.mtx.RLock()
n.tempnl = n.tempnl[:0]
for _, nl := range n.nls.m {
n.tempnl = append(n.tempnl, nl)
}
n.nls.RUnlock()
for _, nl := range tempn {
n.nls.mtx.RUnlock()

for _, nl := range n.tempnl {
n.bcastGetStats(nl, hk.PruneActiveIval)
}
// cleanup temp cloned notifs
clear(tempn)
clear(n.tempnl)

return hk.PruneActiveIval
}
Expand Down Expand Up @@ -457,15 +459,14 @@ func (n *notifs) ListenSmapChanged() {
}
n.smapVer = smap.Version

n.nls.RLock()
if n.nls.len() == 0 {
n.nls.RUnlock()
if n.nls.l.Load() == 0 {
return
}
var (
remnl = make(map[string]nl.Listener)
remid = make(cos.StrKVs)
)
n.nls.mtx.RLock()
for uuid, nl := range n.nls.m {
nl.RLock()
for sid := range nl.ActiveNotifiers() {
Expand All @@ -477,7 +478,7 @@ func (n *notifs) ListenSmapChanged() {
}
nl.RUnlock()
}
n.nls.RUnlock()
n.nls.mtx.RUnlock()
if len(remnl) == 0 {
return
}
Expand All @@ -502,17 +503,17 @@ repeat:
}

// cleanup and callback w/ nl.Err
n.fin.Lock()
n.fin.mtx.Lock()
for uuid, nl := range remnl {
debug.Assert(nl.UUID() == uuid)
n.fin.add(nl, true /*locked*/)
}
n.fin.Unlock()
n.nls.Lock()
n.fin.mtx.Unlock()
n.nls.mtx.Lock()
for _, nl := range remnl {
n.del(nl, true /*locked*/)
}
n.nls.Unlock()
n.nls.mtx.Unlock()

for _, nl := range remnl {
nl.Callback(nl, now)
Expand All @@ -522,26 +523,25 @@ repeat:
clear(remid)
}

func (n *notifs) MarshalJSON() (data []byte, err error) {
t := jsonNotifs{}
n.nls.RLock()
n.fin.RLock()
if n.nls.len() == 0 && n.fin.len() == 0 {
n.fin.RUnlock()
n.nls.RUnlock()
return
func (n *notifs) MarshalJSON() ([]byte, error) {
if n.size() == 0 {
return nil, nil
}
t.Running = make([]*notifListenMsg, 0, n.nls.len())
t.Finished = make([]*notifListenMsg, 0, n.fin.len())
t := jsonNotifs{}

n.nls.mtx.RLock()
n.fin.mtx.RLock()
t.Running = make([]*notifListenMsg, 0, len(n.nls.m))
t.Finished = make([]*notifListenMsg, 0, len(n.fin.m))
for _, nl := range n.nls.m {
t.Running = append(t.Running, newNLMsg(nl))
}
n.nls.RUnlock()
n.nls.mtx.RUnlock()

for _, nl := range n.fin.m {
t.Finished = append(t.Finished, newNLMsg(nl))
}
n.fin.RUnlock()
n.fin.mtx.RUnlock()

return jsoniter.Marshal(t)
}
Expand All @@ -567,8 +567,8 @@ func (n *notifs) UnmarshalJSON(data []byte) (err error) {
// (under lock)
func (n *notifs) apply(t *jsonNotifs) {
added, removed, finished := n.added[:0], n.removed[:0], n.finished[:0]
n.nls.RLock()
n.fin.RLock()
n.nls.mtx.RLock()
n.fin.mtx.RLock()
for _, m := range t.Running {
if n.fin.exists(m.nl.UUID()) || n.nls.exists(m.nl.UUID()) {
continue
Expand All @@ -585,34 +585,34 @@ func (n *notifs) apply(t *jsonNotifs) {
}
finished = append(finished, m.nl)
}
n.fin.RUnlock()
n.nls.RUnlock()
n.fin.mtx.RUnlock()
n.nls.mtx.RUnlock()

if len(removed) == 0 && len(added) == 0 {
goto fin
}

// Add/Remove `nl` - `n.nls`.
n.nls.Lock()
n.nls.mtx.Lock()
for _, nl := range added {
n.nls.add(nl, true /*locked*/)
}
for _, nl := range removed {
n.nls.del(nl, true /*locked*/)
}
n.nls.Unlock()
n.nls.mtx.Unlock()

fin:
if len(finished) == 0 {
return
}

n.fin.Lock()
n.fin.mtx.Lock()
// Add `nl` to `n.fin`.
for _, nl := range finished {
n.fin.add(nl, true /*locked*/)
}
n.fin.Unlock()
n.fin.mtx.Unlock()

// Call the Callback for each `nl` marking it finished.
now := time.Now().UnixNano()
Expand All @@ -622,48 +622,51 @@ fin:
}

func (n *notifs) String() string {
l, f := n.nls.len(), n.fin.len() // not r-locking
return fmt.Sprintf("%s (nls=%d, fin=%d)", notifsName, l, f)
// not r-locking
return fmt.Sprintf("%s (nls=%d, fin=%d)", notifsName, len(n.nls.m), len(n.fin.m))
}

///////////////
// listeners //
///////////////

func newListeners() *listeners { return &listeners{m: make(map[string]nl.Listener, 64)} }
func (l *listeners) len() int { return len(l.m) }

func (l *listeners) entry(uuid string) (entry nl.Listener, exists bool) {
l.RLock()
l.mtx.RLock()
entry, exists = l.m[uuid]
l.RUnlock()
return
l.mtx.RUnlock()
return entry, exists
}

func (l *listeners) add(nl nl.Listener, locked bool) (exists bool) {
if !locked {
l.Lock()
l.mtx.Lock()
}
if _, exists = l.m[nl.UUID()]; !exists {
l.m[nl.UUID()] = nl
a := l.l.Inc()
debug.Assert(len(l.m) == int(a), a, " vs ", len(l.m))
}
if !locked {
l.Unlock()
l.mtx.Unlock()
}
return
}

func (l *listeners) del(nl nl.Listener, locked bool) (ok bool) {
if !locked {
l.Lock()
l.mtx.Lock()
} else {
debug.AssertRWMutexLocked(&l.RWMutex)
debug.AssertRWMutexLocked(&l.mtx)
}
if _, ok = l.m[nl.UUID()]; ok {
delete(l.m, nl.UUID())
a := l.l.Dec()
debug.Assert(len(l.m) == int(a), a, " vs ", len(l.m))
}
if !locked {
l.Unlock()
l.mtx.Unlock()
}
return
}
Expand All @@ -680,7 +683,7 @@ func (l *listeners) exists(uuid string) (ok bool) {
// (compare with the below)
func (l *listeners) find(flt nlFilter) (nl nl.Listener) {
var ftime int64
l.RLock()
l.mtx.RLock()
for _, listener := range l.m {
if !flt.match(listener) {
continue
Expand All @@ -696,19 +699,19 @@ func (l *listeners) find(flt nlFilter) (nl nl.Listener) {
}
ftime = et
}
l.RUnlock()
l.mtx.RUnlock()
return
}

// returns all matches
func (l *listeners) findAll(flt nlFilter) (nls []nl.Listener) {
l.RLock()
l.mtx.RLock()
for _, listener := range l.m {
if flt.match(listener) {
nls = append(nls, listener)
}
}
l.RUnlock()
l.mtx.RUnlock()
return
}

Expand Down

0 comments on commit b7965b7

Please sign in to comment.