Skip to content

Commit

Permalink
metasync: amend GFN notifications
Browse files Browse the repository at this point in the history
* num connection-refused retries: sync vs notify
* metasync-notify: never reset handle-pending timer
* add err-work-channel-full, and use it
* with minor refactoring
* part two, prev. commit: 1a01903

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jul 23, 2024
1 parent 1a01903 commit abffbe3
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 25 deletions.
4 changes: 2 additions & 2 deletions ais/clustermap.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ func (sls *sls) notify(ver int64) {
return
}
sls.postCh <- ver
if len(sls.postCh) == cap(sls.postCh) {
nlog.ErrorDepth(1, "sls channel full: Smap v", ver) // unlikely
if l, c := len(sls.postCh), cap(sls.postCh); l > c/2 {
nlog.ErrorDepth(1, cos.ErrWorkChanFull, l, c, "Smap version:", ver) // unlikely
}
}
63 changes: 43 additions & 20 deletions ais/metasync.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,12 @@ const (

const failsync = "failing to sync"

const retryConnRefused = 4
const (
retrySyncRefused = 4
retryNotifyRefused = 2
)

const workChanCap = 32

type (
revs interface {
Expand All @@ -113,7 +118,7 @@ type (
wg *sync.WaitGroup
failedCnt *atomic.Int32
pairs []revsPair
reqType int // enum: reqSync, etc.
ty int // enum: { reqSync, reqNotify }
}
msPayload map[string][]byte // tag => revs' body
ndRevs map[string]int64 // tag => version (see nodesRevs)
Expand Down Expand Up @@ -155,7 +160,7 @@ func newMetasyncer(p *proxy) (y *metasyncer) {
y.lastSynced = make(map[string]revs, revsMaxTags)

y.stopCh = make(chan struct{}, 1)
y.workCh = make(chan revsReq, 32)
y.workCh = make(chan revsReq, workChanCap)

y.retryTimer = time.NewTimer(time.Hour)
y.retryTimer.Stop()
Expand All @@ -164,9 +169,8 @@ func newMetasyncer(p *proxy) (y *metasyncer) {
}

func (y *metasyncer) Run() error {
nlog.Infof("Starting %s", y.Name())
nlog.Infoln("Starting", y.Name())
for {
config := cmn.GCO.Get()
select {
case revsReq, ok := <-y.workCh:
if !ok {
Expand All @@ -180,14 +184,17 @@ func (y *metasyncer) Run() error {
y.timerStopped = true
break
}
failedCnt := y.do(revsReq.pairs, revsReq.reqType)
failedCnt := y.do(revsReq.pairs, revsReq.ty)
if revsReq.wg != nil {
if revsReq.failedCnt != nil {
revsReq.failedCnt.Store(int32(failedCnt))
}
revsReq.wg.Done()
}
if y.timerStopped && failedCnt > 0 {

// timed retry, via handlePending()
if y.timerStopped && failedCnt > 0 && revsReq.ty != reqNotify {
config := cmn.GCO.Get()
y.retryTimer.Reset(config.Periodic.RetrySyncTime.D())
y.timerStopped = false
}
Expand All @@ -197,8 +204,14 @@ func (y *metasyncer) Run() error {
case <-y.retryTimer.C:
failedCnt := y.handlePending()
if failedCnt > 0 {
config := cmn.GCO.Get()
y.retryTimer.Reset(config.Periodic.RetrySyncTime.D())
y.timerStopped = false

if l, c := len(y.workCh), cap(y.workCh); l > c/2 {
nlog.Errorln("Warning:", y.p.String(), "[hp]:", cos.ErrWorkChanFull, "len", l, "cap", c,
"failed", failedCnt)
}
} else {
y.timerStopped = true
}
Expand All @@ -210,8 +223,11 @@ func (y *metasyncer) Run() error {
}

func (y *metasyncer) Stop(err error) {
nlog.Infof("Stopping %s: %v", y.Name(), err)

if err == nil {
nlog.Infoln("Stopping", y.Name())
} else {
nlog.Infoln("Stopping", y.Name()+":", err)
}
y.stopCh <- struct{}{}
close(y.stopCh)
}
Expand All @@ -220,7 +236,7 @@ func (y *metasyncer) Stop(err error) {
func (y *metasyncer) notify(wait bool, pair revsPair) (failedCnt int) {
var (
failedCntAtomic = atomic.NewInt32(0)
req = revsReq{pairs: []revsPair{pair}, reqType: reqNotify}
req = revsReq{pairs: []revsPair{pair}, ty: reqNotify}
)
if y.isPrimary() != nil {
return
Expand Down Expand Up @@ -248,7 +264,7 @@ func (y *metasyncer) sync(pairs ...revsPair) *sync.WaitGroup {
return req.wg
}
req.wg.Add(1)
req.reqType = reqSync
req.ty = reqSync
y.workCh <- req
return req.wg
}
Expand Down Expand Up @@ -281,7 +297,7 @@ func (y *metasyncer) do(pairs []revsPair, reqT int) (failedCnt int) {
method = http.MethodPost
}
if nlog.Stopping() {
return
return 0
}

// step: build payload and update last sync-ed
Expand Down Expand Up @@ -326,11 +342,13 @@ func (y *metasyncer) do(pairs []revsPair, reqT int) (failedCnt int) {
body = payload.marshal(y.p.gmm)
to = core.AllNodes
smap = y.p.owner.smap.get()
retries = retrySyncRefused // connection-refused
)
defer body.Free()

if reqT == reqNotify {
to = core.Targets
retries = retryNotifyRefused
}
args := allocBcArgs()
args.req = cmn.HreqArgs{Method: method, Path: urlPath, BodyR: body}
Expand Down Expand Up @@ -368,9 +386,10 @@ func (y *metasyncer) do(pairs []revsPair, reqT int) (failedCnt int) {
}
}
freeBcastRes(results)

// step: handle connection-refused right away
lr := len(refused)
for range retryConnRefused {
for range retries {
if len(refused) == 0 {
if lr > 0 {
nlog.Infof("%s: %d node%s sync-ed", y.p, lr, cos.Plural(lr))
Expand All @@ -381,12 +400,13 @@ func (y *metasyncer) do(pairs []revsPair, reqT int) (failedCnt int) {
smap = y.p.owner.smap.get()
if !smap.isPrimary(y.p.si) {
y.becomeNonPrimary()
return
return 0
}
if !y.handleRefused(method, urlPath, body, refused, pairs, smap) {
break
}
}

// step: housekeep and return new pending
smap = y.p.owner.smap.get()
for sid := range y.nodesRevs {
Expand All @@ -396,7 +416,7 @@ func (y *metasyncer) do(pairs []revsPair, reqT int) (failedCnt int) {
}
}
failedCnt += len(refused)
return
return failedCnt
}

func (y *metasyncer) jit(pair revsPair) revs {
Expand Down Expand Up @@ -460,7 +480,7 @@ func (y *metasyncer) handleRefused(method, urlPath string, body io.Reader, refus
// failing to sync
if res.status == http.StatusConflict {
if e := err2MsyncErr(res.err); e != nil {
msg := fmt.Sprintf("%s [hr]: %s %s: %s [%v]", y.p.si, failsync, res.si, e.Message, e.Cii)
msg := fmt.Sprintf("%s [hr]: %s %s: %s [%v]", y.p, failsync, res.si, e.Message, e.Cii)
if !y.remainPrimary(e, res.si, smap) {
nlog.Errorln(msg + " - aborting")
freeBcastRes(results)
Expand Down Expand Up @@ -522,8 +542,11 @@ func (y *metasyncer) _pending() (pending meta.NodeMap, smap *smapX) {
func (y *metasyncer) handlePending() (failedCnt int) {
pending, smap := y._pending()
if len(pending) == 0 {
nlog.Infof("no pending revs - all good")
return
nlog.Infoln("no pending revs - all good")
return 0
}
if nlog.Stopping() {
return 0
}
var (
l = len(y.lastSynced)
Expand Down Expand Up @@ -582,7 +605,7 @@ func (y *metasyncer) handlePending() (failedCnt int) {
nlog.Warningf("%s [hp]: %s %s: %v(%d)", y.p, failsync, res.si, res.err, res.status)
}
freeBcastRes(results)
return
return failedCnt
}

// cie and isPrimary checks versus remote clusterInfo
Expand All @@ -608,7 +631,7 @@ func (y *metasyncer) remainPrimary(e *errMsync, from *meta.Snode, smap *smapX) b
return true
}
nlog.Errorf("%s: [%s %s] vs %v from %s", ciError(90), y.p, smap.StringEx(), e.Cii, from)
return true // TODO: iffy; may need to do more
return true
}

func (y *metasyncer) isPrimary() (err error) {
Expand Down
2 changes: 2 additions & 0 deletions cmn/cos/err.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ var (
errQuantityNonNegative = errors.New("quantity should not be negative")
)

var ErrWorkChanFull = errors.New("work channel full")

var errBufferUnderrun = errors.New("buffer underrun")

// ErrNotFound
Expand Down
2 changes: 1 addition & 1 deletion mirror/put_copies.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (r *XactPut) stop() (err error) {
err = fmt.Errorf("%s: dropped %d object%s", r, n, cos.Plural(n))
}
if cnt := r.chanFull.Load(); (cnt >= 10 && cnt <= 20) || (cnt > 0 && cmn.Rom.FastV(5, cos.SmoduleMirror)) {
nlog.Errorln("work channel full (all mp workers)", r.String(), cnt)
nlog.Errorln(cos.ErrWorkChanFull, "(all mp workers)", r.String(), "cnt", cnt)
}
return
}
Expand Down
2 changes: 1 addition & 1 deletion transport/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func (s *streamBase) sendLoop(dryrun bool) {
s.streamer.abortPending(err, false /*completions*/)

if cnt := s.chanFull.Load(); (cnt >= 10 && cnt <= 20) || (cnt > 0 && verbose) {
nlog.Errorln("work channel full", s.lid, cnt)
nlog.Errorln(cos.ErrWorkChanFull, s.lid, "cnt", cnt)
}
}

Expand Down
2 changes: 1 addition & 1 deletion xact/xs/tcobjs.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (r *XactTCObjs) Do(msg *cmn.TCObjsMsg) {
if l == c {
cnt := r.chanFull.Inc()
if (cnt >= 10 && cnt <= 20) || (cnt > 0 && cmn.Rom.FastV(5, cos.SmoduleXs)) {
nlog.Errorln("work channel full", r.Name())
nlog.Errorln(cos.ErrWorkChanFull, r.Name(), "cnt", cnt)
}
}
}
Expand Down

0 comments on commit abffbe3

Please sign in to comment.