diff --git a/ais/htrun.go b/ais/htrun.go index 9296bf9697..431ed2a009 100644 --- a/ais/htrun.go +++ b/ais/htrun.go @@ -101,6 +101,10 @@ func (h *htrun) Sowner() meta.Sowner { return h.owner.smap } func (h *htrun) PageMM() *memsys.MMSA { return h.gmm } func (h *htrun) ByteMM() *memsys.MMSA { return h.smm } +func (h *htrun) errStopping() error { + return errors.New(h.si.Name() + " is stopping") +} + // NOTE: currently, only 'resume' (see also: kaSuspendMsg) func (h *htrun) smapUpdatedCB(_, _ *smapX, nfl, ofl cos.BitFlags) { if ofl.IsAnySet(meta.SnodeMaintDecomm) && !nfl.IsAnySet(meta.SnodeMaintDecomm) { @@ -620,6 +624,8 @@ func (h *htrun) _call(si *meta.Snode, bargs *bcastArgs, results *bcastResults) { freeCargs(cargs) } +const _callHdrLen = 5 + func (h *htrun) call(args *callArgs, smap *smapX) (res *callResult) { var ( req *http.Request @@ -638,10 +644,6 @@ func (h *htrun) call(args *callArgs, smap *smapX) (res *callResult) { args.req.Base = args.si.ControlNet.URL // by default, use intra-cluster control network } - if args.req.Header == nil { - args.req.Header = make(http.Header) - } - switch args.timeout { case apc.DefaultTimeout: req, res.err = args.req.Req() @@ -682,14 +684,18 @@ func (h *htrun) call(args *callArgs, smap *smapX) (res *callResult) { return res } - req.Header.Set(apc.HdrCallerID, h.SID()) - req.Header.Set(apc.HdrCallerName, h.si.Name()) + // req header + if args.req.Header == nil { + args.req.Header = make(http.Header, _callHdrLen) + } if smap.vstr != "" { if smap.IsPrimary(h.si) { req.Header.Set(apc.HdrCallerIsPrimary, "true") } req.Header.Set(apc.HdrCallerSmapVer, smap.vstr) } + req.Header.Set(apc.HdrCallerID, h.SID()) + req.Header.Set(apc.HdrCallerName, h.si.Name()) req.Header.Set(cos.HdrUserAgent, ua) resp, res.err = client.Do(req) @@ -855,14 +861,6 @@ func (h *htrun) bcastNodes(bargs *bcastArgs) sliceResults { if si.ID() == h.si.ID() { continue } - - // TODO: remove - debug.Func(func() { - if si.URL(bargs.network) == h.si.URL(bargs.network) { - nlog.Errorf(fmtErrNetInfoChanged, h, si.StringEx(), si.URL(bargs.network)) - } - }) - if !bargs.ignoreMaintenance && si.InMaintOrDecomm() { continue } @@ -1843,7 +1841,7 @@ func (h *htrun) join(query url.Values, htext htext, contactURLs ...string) (res for range 4 { // retry for _, candidateURL := range candidates { if nlog.Stopping() { - return res, errors.New(h.String() + " is stopping") + return res, h.errStopping() } if resPrev != nil { freeCR(resPrev) @@ -1873,13 +1871,14 @@ func (h *htrun) join(query url.Values, htext htext, contactURLs ...string) (res return res, fmt.Errorf("%s: failed to discover a new smap", h) } if nsti.Smap.Version < smap.version() { - return res, fmt.Errorf("%s: current %s version is newer than %d from the primary proxy (%s)", h, smap, nsti.Smap.Version, nsti.Smap.Primary.ID) + return res, fmt.Errorf("%s: current %s version is newer than %d from the primary proxy (%s)", + h, smap, nsti.Smap.Version, nsti.Smap.Primary.ID) } primaryURL = nsti.Smap.Primary.PubURL // Daemon is stopping skip register if nlog.Stopping() { - return res, errors.New(h.String() + " is stopping") + return res, h.errStopping() } res = h.regTo(primaryURL, nil, apc.DefaultTimeout, query, htext, false /*keepalive*/) if res.err == nil { @@ -1940,43 +1939,53 @@ func (h *htrun) regTo(url string, psi *meta.Snode, tout time.Duration, q url.Val return res } -func (h *htrun) sendKalive(smap *smapX, htext htext, timeout time.Duration, fast bool) (pid string, status int, err error) { +// (fast path: nodes => primary) +func (h *htrun) fastKalive(smap *smapX, timeout time.Duration, ecActive bool) (pid string, hdr http.Header, err error) { if nlog.Stopping() { - err = errors.New(h.String() + " is stopping") - return + return "", hdr, h.errStopping() } + debug.Assert(h.ClusterStarted()) + primaryURL, psi := h.getPrimaryURLAndSI(smap, nil) pid = psi.ID() - if fast { - // fast path - debug.Assert(h.ClusterStarted()) - path := apc.URLPathCluKalive.Join(h.SID()) - cargs := allocCargs() - { - cargs.si = psi - cargs.req = cmn.HreqArgs{Method: http.MethodPost, Base: primaryURL, Path: path} - cargs.timeout = timeout - } - res := h.call(cargs, smap) - freeCargs(cargs) - err = res.err - freeCR(res) - return + cargs := allocCargs() + { + cargs.si = psi + cargs.req = cmn.HreqArgs{Method: http.MethodPost, Base: primaryURL, Path: apc.URLPathCluKalive.Join(h.SID())} + cargs.timeout = timeout + } + if ecActive { + hdr := make(http.Header, _callHdrLen) + hdr.Set(apc.HdrActiveEC, "true") + cargs.req.Header = hdr + } + + res := h.call(cargs, smap) + freeCargs(cargs) + err, hdr = res.err, res.header + + freeCR(res) + return pid, hdr, err +} + +// (slow path: nodes => primary) +func (h *htrun) slowKalive(smap *smapX, htext htext, timeout time.Duration) (pid string, status int, err error) { + if nlog.Stopping() { + return "", 0, h.errStopping() } + primaryURL, psi := h.getPrimaryURLAndSI(smap, nil) + pid = psi.ID() - // slow path res := h.regTo(primaryURL, psi, timeout, nil, htext, true /*keepalive*/) if res.err != nil { if strings.Contains(res.err.Error(), ciePrefix) { cos.ExitLog(res.err) // FATAL: cluster integrity error (cie) } status, err = res.status, res.err - freeCR(res) - return } freeCR(res) - return + return pid, status, err } func (h *htrun) getPrimaryURLAndSI(smap *smapX, config *cmn.Config) (string, *meta.Snode) { diff --git a/ais/kalive.go b/ais/kalive.go index 53d9c39264..2d7a6f3115 100644 --- a/ais/kalive.go +++ b/ais/kalive.go @@ -18,6 +18,7 @@ import ( "github.com/NVIDIA/aistore/cmn/mono" "github.com/NVIDIA/aistore/cmn/nlog" "github.com/NVIDIA/aistore/core/meta" + "github.com/NVIDIA/aistore/ec" "github.com/NVIDIA/aistore/stats" ) @@ -37,8 +38,8 @@ const ( type ( keepaliver interface { - sendKalive(*smapX, time.Duration, bool) (string, int, error) - heardFrom(sid string) + sendKalive(*smapX, time.Duration, int64 /*now*/, bool) (string, int, error) + heardFrom(sid string) int64 do(config *cmn.Config) (stopped bool) timeToPing(sid string) bool ctrl(msg string) @@ -73,8 +74,8 @@ type ( } hbTracker interface { - HeardFrom(id string, now int64) // callback for 'id' to respond - TimedOut(id string) bool // true if 'id` didn't keepalive or called (via "heard") within the interval (above) + HeardFrom(id string, now int64) int64 // callback for 'id' to respond + TimedOut(id string) bool // true if 'id` didn't keepalive or called (via "heard") within the interval (above) reg(id string) set(interval time.Duration) bool @@ -137,12 +138,18 @@ func (tkr *talive) cluUptime(now int64) (elapsed time.Duration) { return } -func (tkr *talive) sendKalive(smap *smapX, timeout time.Duration, fast bool) (string, int, error) { +func (tkr *talive) sendKalive(smap *smapX, timeout time.Duration, _ int64, fast bool) (pid string, status int, err error) { if fast { + // additionally interrupted, restarted := tkr.t.interruptedRestarted() fast = !interrupted && !restarted } - return tkr.t.sendKalive(smap, tkr.t, timeout, fast) + if fast { + debug.Assert(ec.ECM != nil) + pid, _, err = tkr.t.fastKalive(smap, timeout, ec.ECM.IsActive()) + return pid, 0, err + } + return tkr.t.slowKalive(smap, tkr.t, timeout) } func (tkr *talive) do(config *cmn.Config) (stopped bool) { @@ -200,9 +207,22 @@ func (pkr *palive) cluUptime(now int64) (elapsed time.Duration) { return } -func (pkr *palive) sendKalive(smap *smapX, timeout time.Duration, fast bool) (string, int, error) { +func (pkr *palive) sendKalive(smap *smapX, timeout time.Duration, now int64, fast bool) (string, int, error) { debug.Assert(!smap.isPrimary(pkr.p.si)) - return pkr.p.htrun.sendKalive(smap, nil /*htext*/, timeout, fast) + + if fast { + pid, hdr, err := pkr.p.fastKalive(smap, timeout, false /*ec active*/) + if err == nil { + // check resp header from primary + // (see: _respActiveEC; compare with: _recvActiveEC) + if isActiveEC(hdr) { + pkr.p.lastEC.Store(now) + } + } + return pid, 0, err + } + + return pkr.p.slowKalive(smap, nil /*htext*/, timeout) } func (pkr *palive) do(config *cmn.Config) (stopped bool) { @@ -432,8 +452,8 @@ func (pkr *palive) retry(si *meta.Snode, ticker *time.Ticker, timeout time.Durat func (k *keepalive) Name() string { return k.name } -func (k *keepalive) heardFrom(sid string) { - k.hb.HeardFrom(sid, 0 /*now*/) +func (k *keepalive) heardFrom(sid string) int64 { + return k.hb.HeardFrom(sid, 0 /*now*/) } // wait for stats-runner to set startedUp=true @@ -538,7 +558,7 @@ func (k *keepalive) do(smap *smapX, si *meta.Snode, config *cmn.Config) (stopped return } fast = k.k.cluUptime(started) > max(k.interval<<2, config.Timeout.Startup.D()>>1) - cpid, status, err := k.k.sendKalive(smap, timeout, fast) + cpid, status, err := k.k.sendKalive(smap, timeout, started, fast) if err == nil { now := mono.NanoTime() k.statsT.Add(stats.KeepAliveLatency, now-started) @@ -564,7 +584,7 @@ func (k *keepalive) do(smap *smapX, si *meta.Snode, config *cmn.Config) (stopped // and therefore not skipping keepalive req (compare with palive.retry) i++ started := mono.NanoTime() - pid, status, err = k.k.sendKalive(nil, timeout, false) + pid, status, err = k.k.sendKalive(nil, timeout, started, false) if pid == si.ID() { return // elected as primary } @@ -622,7 +642,7 @@ func (k *keepalive) paused() bool { return k.tickerPaused.Load() } func newHB(interval time.Duration) *heartBeat { return &heartBeat{interval: interval} } -func (hb *heartBeat) HeardFrom(id string, now int64) { +func (hb *heartBeat) HeardFrom(id string, now int64) int64 { var ( val *int64 v, ok = hb.last.Load(id) @@ -637,6 +657,7 @@ func (hb *heartBeat) HeardFrom(id string, now int64) { hb.last.Store(id, val) } ratomic.StoreInt64(val, now) + return now } func (hb *heartBeat) TimedOut(id string) bool { diff --git a/ais/proxy.go b/ais/proxy.go index 1c9d6d4e94..cbb0957c54 100644 --- a/ais/proxy.go +++ b/ais/proxy.go @@ -72,6 +72,8 @@ type ( mu sync.RWMutex in atomic.Bool } + lastEC atomic.Int64 // last active EC via apc.HdrActiveEC (mono time) + settingNewPrimary atomic.Bool // primary executing "set new primary" request (state) readyToFastKalive atomic.Bool // primary can accept fast keepalives } diff --git a/ais/prxclu.go b/ais/prxclu.go index ceda6b6d00..7bb2b4f990 100644 --- a/ais/prxclu.go +++ b/ais/prxclu.go @@ -385,7 +385,7 @@ func (p *proxy) httpclupost(w http.ResponseWriter, r *http.Request) { case apc.Keepalive: // fast path(?) if len(apiItems) > 1 { - p.fastKalive(w, r, smap, config, apiItems[1]) + p.fastKaliveRsp(w, r, smap, config, apiItems[1] /*sid*/) return } @@ -567,7 +567,8 @@ func (p *proxy) httpclupost(w http.ResponseWriter, r *http.Request) { go p.mcastJoined(nsi, msg, nsi.Flags, ®Req) } -func (p *proxy) fastKalive(w http.ResponseWriter, r *http.Request, smap *smapX, config *cmn.Config, sid string) { +// respond to fastKalive from nodes +func (p *proxy) fastKaliveRsp(w http.ResponseWriter, r *http.Request, smap *smapX, config *cmn.Config, sid string) { fast := p.readyToFastKalive.Load() if !fast { var ( @@ -586,7 +587,13 @@ func (p *proxy) fastKalive(w http.ResponseWriter, r *http.Request, smap *smapX, ) if callerID == sid && callerSver != "" && callerSver == smap.vstr { if si := smap.GetNode(sid); si != nil { - p.keepalive.heardFrom(sid) + now := p.keepalive.heardFrom(sid) + + if si.IsTarget() { + p._recvActiveEC(r.Header, now) + } else { + p._respActiveEC(w.Header(), now) + } return } } @@ -965,7 +972,7 @@ func (p *proxy) httpcluput(w http.ResponseWriter, r *http.Request) { } if nlog.Stopping() { - p.writeErr(w, r, fmt.Errorf("%s is stopping", p), http.StatusServiceUnavailable) + p.writeErr(w, r, p.errStopping(), http.StatusServiceUnavailable) return } if !p.NodeStarted() { @@ -2202,7 +2209,7 @@ func (p *proxy) _unregNodePre(ctx *smapModifier, clone *smapX) error { // rebalance's `can`: factors not including cluster map func (p *proxy) canRebalance() (err error) { if nlog.Stopping() { - return fmt.Errorf("%s is stopping", p) + return p.errStopping() } smap := p.owner.smap.get() if err = smap.validate(); err != nil { @@ -2258,3 +2265,31 @@ func mustRebalance(ctx *smapModifier, cur *smapX) bool { } return false } + +// +// (EC is active) and (is EC active?) via fastKalive +// + +// default config.Timeout.EcStreams +const ( + ecTimeout = 10 * time.Minute +) + +func isActiveEC(hdr http.Header) (ok bool) { + _, ok = hdr[apc.HdrActiveEC] + return ok +} + +// (target send => primary) +func (p *proxy) _recvActiveEC(rhdr http.Header, now int64) { + if isActiveEC(rhdr) { + p.lastEC.Store(now) + } +} + +// (primary resp => non-primary) +func (p *proxy) _respActiveEC(whdr http.Header, now int64) { + if time.Duration(now-p.lastEC.Load()) < ecTimeout { + whdr.Set(apc.HdrActiveEC, "true") + } +} diff --git a/ais/prxnotif_internal_test.go b/ais/prxnotif_internal_test.go index 4bed5dcbec..2733bde105 100644 --- a/ais/prxnotif_internal_test.go +++ b/ais/prxnotif_internal_test.go @@ -25,10 +25,10 @@ import ( type nopHB struct{} -func (*nopHB) HeardFrom(string, int64) {} -func (*nopHB) TimedOut(string) bool { return false } -func (*nopHB) reg(string) {} -func (*nopHB) set(time.Duration) bool { return false } +func (*nopHB) HeardFrom(string, int64) int64 { return 0 } +func (*nopHB) TimedOut(string) bool { return false } +func (*nopHB) reg(string) {} +func (*nopHB) set(time.Duration) bool { return false } var _ hbTracker = (*nopHB)(nil) diff --git a/ais/tgtcp.go b/ais/tgtcp.go index e44a9f18de..856599c325 100644 --- a/ais/tgtcp.go +++ b/ais/tgtcp.go @@ -1265,13 +1265,9 @@ func (t *target) headt2t(lom *core.LOM, tsi *meta.Snode, smap *smapX) (ok bool) cargs.si = tsi cargs.req = cmn.HreqArgs{ Method: http.MethodHead, - Header: http.Header{ - apc.HdrCallerID: []string{t.SID()}, - apc.HdrCallerName: []string{t.callerName()}, - }, - Base: tsi.URL(cmn.NetIntraControl), - Path: apc.URLPathObjects.Join(lom.Bck().Name, lom.ObjName), - Query: q, + Base: tsi.URL(cmn.NetIntraControl), + Path: apc.URLPathObjects.Join(lom.Bck().Name, lom.ObjName), + Query: q, } cargs.timeout = cmn.Rom.CplaneOperation() } @@ -1292,12 +1288,8 @@ func (t *target) headObjBcast(lom *core.LOM, smap *smapX) *meta.Snode { args := allocBcArgs() args.req = cmn.HreqArgs{ Method: http.MethodHead, - Header: http.Header{ - apc.HdrCallerID: []string{t.SID()}, - apc.HdrCallerName: []string{t.callerName()}, - }, - Path: apc.URLPathObjects.Join(lom.Bck().Name, lom.ObjName), - Query: q, + Path: apc.URLPathObjects.Join(lom.Bck().Name, lom.ObjName), + Query: q, } args.ignoreMaintenance = true args.smap = smap diff --git a/api/apc/headers.go b/api/apc/headers.go index 676dc541f2..bba4bb7036 100644 --- a/api/apc/headers.go +++ b/api/apc/headers.go @@ -101,6 +101,9 @@ const ( // Promote(dir) HdrPromoteNamesHash = aisPrefix + "Promote-Names-Hash" HdrPromoteNamesNum = aisPrefix + "Promote-Names-Num" + + // EC + HdrActiveEC = aisPrefix + "Ec" ) const lais = len(aisPrefix) diff --git a/ec/bckencodexact.go b/ec/bckencodexact.go index 3f79326329..81ef4b5b6a 100644 --- a/ec/bckencodexact.go +++ b/ec/bckencodexact.go @@ -94,6 +94,8 @@ func (r *XactBckEncode) Run(wg *sync.WaitGroup) { return } + ECM.incActive(r) + opts := &mpather.JgroupOpts{ CTs: []string{fs.ObjectType}, VisitObj: r.bckEncode, diff --git a/ec/ec.go b/ec/ec.go index 3c48d11e93..50b3b5baac 100644 --- a/ec/ec.go +++ b/ec/ec.go @@ -207,7 +207,7 @@ func Init() { xreg.RegBckXact(&encFactory{}) if err := initManager(); err != nil { - cos.ExitLog("Failed to init manager:", err) + cos.ExitLog("Failed to initialize EC manager:", err) } } diff --git a/ec/getxaction.go b/ec/getxaction.go index 148cfb46f5..a0448a3f64 100644 --- a/ec/getxaction.go +++ b/ec/getxaction.go @@ -188,6 +188,8 @@ func (r *XactGet) Run(*sync.WaitGroup) { ticker := time.NewTicker(r.config.Periodic.StatsTime.D()) defer ticker.Stop() + ECM.incActive(r) + // as of now all requests are equal. Some may get throttling later for { select { diff --git a/ec/manager.go b/ec/manager.go index 57a4aec2a9..4457850d48 100644 --- a/ec/manager.go +++ b/ec/manager.go @@ -19,8 +19,10 @@ import ( "github.com/NVIDIA/aistore/core" "github.com/NVIDIA/aistore/core/meta" "github.com/NVIDIA/aistore/fs" + "github.com/NVIDIA/aistore/nl" "github.com/NVIDIA/aistore/transport" "github.com/NVIDIA/aistore/transport/bundle" + "github.com/NVIDIA/aistore/xact" "github.com/NVIDIA/aistore/xact/xreg" ) @@ -34,6 +36,9 @@ type Manager struct { reqBundle ratomic.Pointer[bundle.Streams] respBundle ratomic.Pointer[bundle.Streams] + // ref count + _refc atomic.Int32 + bundleEnabled atomic.Bool // to disable and enable on the fly } @@ -57,6 +62,22 @@ func initManager() (err error) { func (mgr *Manager) req() *bundle.Streams { return mgr.reqBundle.Load() } func (mgr *Manager) resp() *bundle.Streams { return mgr.respBundle.Load() } +func (mgr *Manager) IsActive() bool { return mgr._refc.Load() != 0 } + +func (mgr *Manager) incActive(xctn core.Xact) { + mgr._refc.Inc() + notif := &xact.NotifXact{ + Base: nl.Base{When: core.UponTerm, F: mgr.notifyTerm}, + Xact: xctn, + } + xctn.AddNotif(notif) +} + +func (mgr *Manager) notifyTerm(core.Notif, error, bool) { + rc := mgr._refc.Dec() + debug.Assert(rc >= 0, "rc: ", rc) +} + func (mgr *Manager) initECBundles() error { if !mgr.bundleEnabled.CAS(false, true) { return nil diff --git a/ec/putxaction.go b/ec/putxaction.go index 7ef9c5a9c6..0cb586b95a 100644 --- a/ec/putxaction.go +++ b/ec/putxaction.go @@ -159,6 +159,8 @@ func (r *XactPut) Run(*sync.WaitGroup) { go jog.run(&wg) } + ECM.incActive(r) + ticker := time.NewTicker(r.config.Periodic.StatsTime.D()) r.mainLoop(ticker) ticker.Stop() diff --git a/ec/respondxaction.go b/ec/respondxaction.go index 85fbb7a0aa..8b99f78fa6 100644 --- a/ec/respondxaction.go +++ b/ec/respondxaction.go @@ -80,6 +80,8 @@ func newRespondXact(bck *cmn.Bck, mgr *Manager) *XactRespond { func (r *XactRespond) Run(*sync.WaitGroup) { nlog.Infoln(r.Name()) + ECM.incActive(r) + ticker := time.NewTicker(r.config.Periodic.StatsTime.D()) defer ticker.Stop() diff --git a/ec/xaction.go b/ec/xaction.go index 61db808f1c..3ebbe98607 100644 --- a/ec/xaction.go +++ b/ec/xaction.go @@ -56,14 +56,6 @@ type ( } ) -func (r *xactECBase) init(config *cmn.Config, bck *cmn.Bck, mgr *Manager) { - r.stats = stats{bck: *bck} - r.config = config - r.bck = *bck - r.dOwner = &dataOwner{slices: make(map[string]*slice, 10)} - r.mgr = mgr -} - ///////////////// // xactReqBase // ///////////////// @@ -108,6 +100,14 @@ func (r *xactReqBase) ecRequestsEnabled() bool { // xactECBase // //////////////// +func (r *xactECBase) init(config *cmn.Config, bck *cmn.Bck, mgr *Manager) { + r.stats = stats{bck: *bck} + r.config = config + r.bck = *bck + r.dOwner = &dataOwner{slices: make(map[string]*slice, 10)} + r.mgr = mgr +} + func newSliceResponse(md *Metadata, attrs *cmn.ObjAttrs, fqn string) (reader cos.ReadOpenCloser, err error) { attrs.SetVersion(md.ObjVersion) attrs.Cksum = cos.NewCksum(md.CksumType, md.CksumValue) diff --git a/tools/readers/readers_test.go b/tools/readers/readers_test.go index 5a0191d83b..97ee7a7d93 100644 --- a/tools/readers/readers_test.go +++ b/tools/readers/readers_test.go @@ -246,7 +246,7 @@ func TestSGReader(t *testing.T) { defer mmsa.Terminate(false) { // Basic read - size := rand.Int64N(cos.MiB) + size := max(rand.Int64N(cos.MiB), cos.KiB+rand.Int64N(cos.KiB)) sgl := mmsa.NewSGL(size) defer sgl.Free()