Skip to content

Commit

Permalink
core: close/reopen EC (intra-cluster) streams on demand (major)
Browse files Browse the repository at this point in the history
* ref-count EC xactions (jobs)
  - `incActive` and notification callback
* EC active/inactive state in now cluster-wide information; works as
  follows:
* piggyback on keep-alive heartbeats
  - target =>  (fastKalive)          => primary
  - primary => (fastKalive response) => non-primary
* part one

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Sep 4, 2024
1 parent 9013791 commit d8a71bb
Show file tree
Hide file tree
Showing 15 changed files with 175 additions and 84 deletions.
87 changes: 48 additions & 39 deletions ais/htrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ func (h *htrun) Sowner() meta.Sowner { return h.owner.smap }
func (h *htrun) PageMM() *memsys.MMSA { return h.gmm }
func (h *htrun) ByteMM() *memsys.MMSA { return h.smm }

func (h *htrun) errStopping() error {
return errors.New(h.si.Name() + " is stopping")
}

// NOTE: currently, only 'resume' (see also: kaSuspendMsg)
func (h *htrun) smapUpdatedCB(_, _ *smapX, nfl, ofl cos.BitFlags) {
if ofl.IsAnySet(meta.SnodeMaintDecomm) && !nfl.IsAnySet(meta.SnodeMaintDecomm) {
Expand Down Expand Up @@ -620,6 +624,8 @@ func (h *htrun) _call(si *meta.Snode, bargs *bcastArgs, results *bcastResults) {
freeCargs(cargs)
}

const _callHdrLen = 5

func (h *htrun) call(args *callArgs, smap *smapX) (res *callResult) {
var (
req *http.Request
Expand All @@ -638,10 +644,6 @@ func (h *htrun) call(args *callArgs, smap *smapX) (res *callResult) {
args.req.Base = args.si.ControlNet.URL // by default, use intra-cluster control network
}

if args.req.Header == nil {
args.req.Header = make(http.Header)
}

switch args.timeout {
case apc.DefaultTimeout:
req, res.err = args.req.Req()
Expand Down Expand Up @@ -682,14 +684,18 @@ func (h *htrun) call(args *callArgs, smap *smapX) (res *callResult) {
return res
}

req.Header.Set(apc.HdrCallerID, h.SID())
req.Header.Set(apc.HdrCallerName, h.si.Name())
// req header
if args.req.Header == nil {
args.req.Header = make(http.Header, _callHdrLen)
}
if smap.vstr != "" {
if smap.IsPrimary(h.si) {
req.Header.Set(apc.HdrCallerIsPrimary, "true")
}
req.Header.Set(apc.HdrCallerSmapVer, smap.vstr)
}
req.Header.Set(apc.HdrCallerID, h.SID())
req.Header.Set(apc.HdrCallerName, h.si.Name())
req.Header.Set(cos.HdrUserAgent, ua)

resp, res.err = client.Do(req)
Expand Down Expand Up @@ -855,14 +861,6 @@ func (h *htrun) bcastNodes(bargs *bcastArgs) sliceResults {
if si.ID() == h.si.ID() {
continue
}

// TODO: remove
debug.Func(func() {
if si.URL(bargs.network) == h.si.URL(bargs.network) {
nlog.Errorf(fmtErrNetInfoChanged, h, si.StringEx(), si.URL(bargs.network))
}
})

if !bargs.ignoreMaintenance && si.InMaintOrDecomm() {
continue
}
Expand Down Expand Up @@ -1843,7 +1841,7 @@ func (h *htrun) join(query url.Values, htext htext, contactURLs ...string) (res
for range 4 { // retry
for _, candidateURL := range candidates {
if nlog.Stopping() {
return res, errors.New(h.String() + " is stopping")
return res, h.errStopping()
}
if resPrev != nil {
freeCR(resPrev)
Expand Down Expand Up @@ -1873,13 +1871,14 @@ func (h *htrun) join(query url.Values, htext htext, contactURLs ...string) (res
return res, fmt.Errorf("%s: failed to discover a new smap", h)
}
if nsti.Smap.Version < smap.version() {
return res, fmt.Errorf("%s: current %s version is newer than %d from the primary proxy (%s)", h, smap, nsti.Smap.Version, nsti.Smap.Primary.ID)
return res, fmt.Errorf("%s: current %s version is newer than %d from the primary proxy (%s)",
h, smap, nsti.Smap.Version, nsti.Smap.Primary.ID)
}
primaryURL = nsti.Smap.Primary.PubURL

// Daemon is stopping skip register
if nlog.Stopping() {
return res, errors.New(h.String() + " is stopping")
return res, h.errStopping()
}
res = h.regTo(primaryURL, nil, apc.DefaultTimeout, query, htext, false /*keepalive*/)
if res.err == nil {
Expand Down Expand Up @@ -1940,43 +1939,53 @@ func (h *htrun) regTo(url string, psi *meta.Snode, tout time.Duration, q url.Val
return res
}

func (h *htrun) sendKalive(smap *smapX, htext htext, timeout time.Duration, fast bool) (pid string, status int, err error) {
// (fast path: nodes => primary)
func (h *htrun) fastKalive(smap *smapX, timeout time.Duration, ecActive bool) (pid string, hdr http.Header, err error) {
if nlog.Stopping() {
err = errors.New(h.String() + " is stopping")
return
return "", hdr, h.errStopping()
}
debug.Assert(h.ClusterStarted())

primaryURL, psi := h.getPrimaryURLAndSI(smap, nil)
pid = psi.ID()

if fast {
// fast path
debug.Assert(h.ClusterStarted())
path := apc.URLPathCluKalive.Join(h.SID())
cargs := allocCargs()
{
cargs.si = psi
cargs.req = cmn.HreqArgs{Method: http.MethodPost, Base: primaryURL, Path: path}
cargs.timeout = timeout
}
res := h.call(cargs, smap)
freeCargs(cargs)
err = res.err
freeCR(res)
return
cargs := allocCargs()
{
cargs.si = psi
cargs.req = cmn.HreqArgs{Method: http.MethodPost, Base: primaryURL, Path: apc.URLPathCluKalive.Join(h.SID())}
cargs.timeout = timeout
}
if ecActive {
hdr := make(http.Header, _callHdrLen)
hdr.Set(apc.HdrActiveEC, "true")
cargs.req.Header = hdr
}

res := h.call(cargs, smap)
freeCargs(cargs)
err, hdr = res.err, res.header

freeCR(res)
return pid, hdr, err
}

// (slow path: nodes => primary)
func (h *htrun) slowKalive(smap *smapX, htext htext, timeout time.Duration) (pid string, status int, err error) {
if nlog.Stopping() {
return "", 0, h.errStopping()
}
primaryURL, psi := h.getPrimaryURLAndSI(smap, nil)
pid = psi.ID()

// slow path
res := h.regTo(primaryURL, psi, timeout, nil, htext, true /*keepalive*/)
if res.err != nil {
if strings.Contains(res.err.Error(), ciePrefix) {
cos.ExitLog(res.err) // FATAL: cluster integrity error (cie)
}
status, err = res.status, res.err
freeCR(res)
return
}
freeCR(res)
return
return pid, status, err
}

func (h *htrun) getPrimaryURLAndSI(smap *smapX, config *cmn.Config) (string, *meta.Snode) {
Expand Down
47 changes: 34 additions & 13 deletions ais/kalive.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/NVIDIA/aistore/cmn/mono"
"github.com/NVIDIA/aistore/cmn/nlog"
"github.com/NVIDIA/aistore/core/meta"
"github.com/NVIDIA/aistore/ec"
"github.com/NVIDIA/aistore/stats"
)

Expand All @@ -37,8 +38,8 @@ const (

type (
keepaliver interface {
sendKalive(*smapX, time.Duration, bool) (string, int, error)
heardFrom(sid string)
sendKalive(*smapX, time.Duration, int64 /*now*/, bool) (string, int, error)
heardFrom(sid string) int64
do(config *cmn.Config) (stopped bool)
timeToPing(sid string) bool
ctrl(msg string)
Expand Down Expand Up @@ -73,8 +74,8 @@ type (
}

hbTracker interface {
HeardFrom(id string, now int64) // callback for 'id' to respond
TimedOut(id string) bool // true if 'id` didn't keepalive or called (via "heard") within the interval (above)
HeardFrom(id string, now int64) int64 // callback for 'id' to respond
TimedOut(id string) bool // true if 'id` didn't keepalive or called (via "heard") within the interval (above)

reg(id string)
set(interval time.Duration) bool
Expand Down Expand Up @@ -137,12 +138,18 @@ func (tkr *talive) cluUptime(now int64) (elapsed time.Duration) {
return
}

func (tkr *talive) sendKalive(smap *smapX, timeout time.Duration, fast bool) (string, int, error) {
func (tkr *talive) sendKalive(smap *smapX, timeout time.Duration, _ int64, fast bool) (pid string, status int, err error) {
if fast {
// additionally
interrupted, restarted := tkr.t.interruptedRestarted()
fast = !interrupted && !restarted
}
return tkr.t.sendKalive(smap, tkr.t, timeout, fast)
if fast {
debug.Assert(ec.ECM != nil)
pid, _, err = tkr.t.fastKalive(smap, timeout, ec.ECM.IsActive())
return pid, 0, err
}
return tkr.t.slowKalive(smap, tkr.t, timeout)
}

func (tkr *talive) do(config *cmn.Config) (stopped bool) {
Expand Down Expand Up @@ -200,9 +207,22 @@ func (pkr *palive) cluUptime(now int64) (elapsed time.Duration) {
return
}

func (pkr *palive) sendKalive(smap *smapX, timeout time.Duration, fast bool) (string, int, error) {
func (pkr *palive) sendKalive(smap *smapX, timeout time.Duration, now int64, fast bool) (string, int, error) {
debug.Assert(!smap.isPrimary(pkr.p.si))
return pkr.p.htrun.sendKalive(smap, nil /*htext*/, timeout, fast)

if fast {
pid, hdr, err := pkr.p.fastKalive(smap, timeout, false /*ec active*/)
if err == nil {
// check resp header from primary
// (see: _respActiveEC; compare with: _recvActiveEC)
if isActiveEC(hdr) {
pkr.p.lastEC.Store(now)
}
}
return pid, 0, err
}

return pkr.p.slowKalive(smap, nil /*htext*/, timeout)
}

func (pkr *palive) do(config *cmn.Config) (stopped bool) {
Expand Down Expand Up @@ -432,8 +452,8 @@ func (pkr *palive) retry(si *meta.Snode, ticker *time.Ticker, timeout time.Durat

func (k *keepalive) Name() string { return k.name }

func (k *keepalive) heardFrom(sid string) {
k.hb.HeardFrom(sid, 0 /*now*/)
func (k *keepalive) heardFrom(sid string) int64 {
return k.hb.HeardFrom(sid, 0 /*now*/)
}

// wait for stats-runner to set startedUp=true
Expand Down Expand Up @@ -538,7 +558,7 @@ func (k *keepalive) do(smap *smapX, si *meta.Snode, config *cmn.Config) (stopped
return
}
fast = k.k.cluUptime(started) > max(k.interval<<2, config.Timeout.Startup.D()>>1)
cpid, status, err := k.k.sendKalive(smap, timeout, fast)
cpid, status, err := k.k.sendKalive(smap, timeout, started, fast)
if err == nil {
now := mono.NanoTime()
k.statsT.Add(stats.KeepAliveLatency, now-started)
Expand All @@ -564,7 +584,7 @@ func (k *keepalive) do(smap *smapX, si *meta.Snode, config *cmn.Config) (stopped
// and therefore not skipping keepalive req (compare with palive.retry)
i++
started := mono.NanoTime()
pid, status, err = k.k.sendKalive(nil, timeout, false)
pid, status, err = k.k.sendKalive(nil, timeout, started, false)
if pid == si.ID() {
return // elected as primary
}
Expand Down Expand Up @@ -622,7 +642,7 @@ func (k *keepalive) paused() bool { return k.tickerPaused.Load() }

func newHB(interval time.Duration) *heartBeat { return &heartBeat{interval: interval} }

func (hb *heartBeat) HeardFrom(id string, now int64) {
func (hb *heartBeat) HeardFrom(id string, now int64) int64 {
var (
val *int64
v, ok = hb.last.Load(id)
Expand All @@ -637,6 +657,7 @@ func (hb *heartBeat) HeardFrom(id string, now int64) {
hb.last.Store(id, val)
}
ratomic.StoreInt64(val, now)
return now
}

func (hb *heartBeat) TimedOut(id string) bool {
Expand Down
2 changes: 2 additions & 0 deletions ais/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type (
mu sync.RWMutex
in atomic.Bool
}
lastEC atomic.Int64 // last active EC via apc.HdrActiveEC (mono time)

settingNewPrimary atomic.Bool // primary executing "set new primary" request (state)
readyToFastKalive atomic.Bool // primary can accept fast keepalives
}
Expand Down
45 changes: 40 additions & 5 deletions ais/prxclu.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func (p *proxy) httpclupost(w http.ResponseWriter, r *http.Request) {
case apc.Keepalive:
// fast path(?)
if len(apiItems) > 1 {
p.fastKalive(w, r, smap, config, apiItems[1])
p.fastKaliveRsp(w, r, smap, config, apiItems[1] /*sid*/)
return
}

Expand Down Expand Up @@ -567,7 +567,8 @@ func (p *proxy) httpclupost(w http.ResponseWriter, r *http.Request) {
go p.mcastJoined(nsi, msg, nsi.Flags, &regReq)
}

func (p *proxy) fastKalive(w http.ResponseWriter, r *http.Request, smap *smapX, config *cmn.Config, sid string) {
// respond to fastKalive from nodes
func (p *proxy) fastKaliveRsp(w http.ResponseWriter, r *http.Request, smap *smapX, config *cmn.Config, sid string) {
fast := p.readyToFastKalive.Load()
if !fast {
var (
Expand All @@ -586,7 +587,13 @@ func (p *proxy) fastKalive(w http.ResponseWriter, r *http.Request, smap *smapX,
)
if callerID == sid && callerSver != "" && callerSver == smap.vstr {
if si := smap.GetNode(sid); si != nil {
p.keepalive.heardFrom(sid)
now := p.keepalive.heardFrom(sid)

if si.IsTarget() {
p._recvActiveEC(r.Header, now)
} else {
p._respActiveEC(w.Header(), now)
}
return
}
}
Expand Down Expand Up @@ -965,7 +972,7 @@ func (p *proxy) httpcluput(w http.ResponseWriter, r *http.Request) {
}

if nlog.Stopping() {
p.writeErr(w, r, fmt.Errorf("%s is stopping", p), http.StatusServiceUnavailable)
p.writeErr(w, r, p.errStopping(), http.StatusServiceUnavailable)
return
}
if !p.NodeStarted() {
Expand Down Expand Up @@ -2202,7 +2209,7 @@ func (p *proxy) _unregNodePre(ctx *smapModifier, clone *smapX) error {
// rebalance's `can`: factors not including cluster map
func (p *proxy) canRebalance() (err error) {
if nlog.Stopping() {
return fmt.Errorf("%s is stopping", p)
return p.errStopping()
}
smap := p.owner.smap.get()
if err = smap.validate(); err != nil {
Expand Down Expand Up @@ -2258,3 +2265,31 @@ func mustRebalance(ctx *smapModifier, cur *smapX) bool {
}
return false
}

//
// (EC is active) and (is EC active?) via fastKalive
//

// default config.Timeout.EcStreams
const (
ecTimeout = 10 * time.Minute
)

func isActiveEC(hdr http.Header) (ok bool) {
_, ok = hdr[apc.HdrActiveEC]
return ok
}

// (target send => primary)
func (p *proxy) _recvActiveEC(rhdr http.Header, now int64) {
if isActiveEC(rhdr) {
p.lastEC.Store(now)
}
}

// (primary resp => non-primary)
func (p *proxy) _respActiveEC(whdr http.Header, now int64) {
if time.Duration(now-p.lastEC.Load()) < ecTimeout {
whdr.Set(apc.HdrActiveEC, "true")
}
}
8 changes: 4 additions & 4 deletions ais/prxnotif_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import (

type nopHB struct{}

func (*nopHB) HeardFrom(string, int64) {}
func (*nopHB) TimedOut(string) bool { return false }
func (*nopHB) reg(string) {}
func (*nopHB) set(time.Duration) bool { return false }
func (*nopHB) HeardFrom(string, int64) int64 { return 0 }
func (*nopHB) TimedOut(string) bool { return false }
func (*nopHB) reg(string) {}
func (*nopHB) set(time.Duration) bool { return false }

var _ hbTracker = (*nopHB)(nil)

Expand Down
Loading

0 comments on commit d8a71bb

Please sign in to comment.