Skip to content

Commit

Permalink
core: retry primary keepalive (part four)
Browse files Browse the repository at this point in the history
* with refactoring
* prev. commits: 9c961d5, e3503fc
* separately, CLI 'ais log get' inline help

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Sep 23, 2024
1 parent fea2b63 commit 89d06db
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 52 deletions.
2 changes: 1 addition & 1 deletion ais/htcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ func (smap *smapX) fill(nsti *cos.NodeStateInfo) {

func (c *getMaxCii) do(si *meta.Snode, wg cos.WG, smap *smapX) {
var nsti *cos.NodeStateInfo
body, _, err := c.h.reqHealth(si, c.timeout, c.query, smap)
body, _, err := c.h.reqHealth(si, c.timeout, c.query, smap, false /*retry pub-addr*/)
if err != nil {
goto ret
}
Expand Down
10 changes: 5 additions & 5 deletions ais/htrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -1413,7 +1413,7 @@ func (h *htrun) isValidObjname(w http.ResponseWriter, r *http.Request, name stri
}

// health client
func (h *htrun) reqHealth(si *meta.Snode, tout time.Duration, q url.Values, smap *smapX, retry ...int) ([]byte, int, error) {
func (h *htrun) reqHealth(si *meta.Snode, tout time.Duration, q url.Values, smap *smapX, retry bool) ([]byte, int, error) {
var (
path = apc.URLPathHealth.S
url = si.URL(cmn.NetIntraControl)
Expand All @@ -1428,10 +1428,10 @@ func (h *htrun) reqHealth(si *meta.Snode, tout time.Duration, q url.Values, smap
b, status, err := res.bytes, res.status, res.err
freeCR(res)

if err != nil && len(retry) > 0 {
if err != nil && retry {
// [NOTE] retrying when:
// - about to remove the node from the cluster map, or
// - about to elect new primary
// - about to remove node 'si' from the cluster map, or
// - about to elect a new primary;
// not checking `IsErrDNSLookup` and similar - ie., not trying to narrow down
// (compare w/ slow-keepalive)
if si.PubNet.Hostname != si.ControlNet.Hostname {
Expand Down Expand Up @@ -2077,7 +2077,7 @@ func (h *htrun) pollClusterStarted(config *cmn.Config, psi *meta.Snode) (maxNsti
nlog.Warningln(h.String(), "started as a non-primary and got _elected_ during startup")
return
}
if _, _, err := h.reqHealth(smap.Primary, healthTimeout, query /*ask primary*/, smap); err == nil {
if _, _, err := h.reqHealth(smap.Primary, healthTimeout, query /*ask primary*/, smap, false /*retry pub-addr*/); err == nil {
// log
s := fmt.Sprintf("%s via primary health: cluster startup Ok, %s", h.si, smap.StringEx())
if self := smap.GetNode(h.si.ID()); self == nil {
Expand Down
49 changes: 28 additions & 21 deletions ais/kalive.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (pkr *palive) do(config *cmn.Config) (stopped bool) {
}
if smap.isPrimary(pkr.p.si) {
if !pkr.inProgress.CAS(false, true) {
nlog.Infoln(pkr.p.String() + ": primary keepalive in progress")
nlog.Warningln(pkr.p.String(), "primary keepalive in progress") // NOTE: see wg.Wait() below
return
}
stopped = pkr.updateSmap(config)
Expand All @@ -249,8 +249,8 @@ func (pkr *palive) do(config *cmn.Config) (stopped bool) {
return
}

// updateSmap pings all nodes in parallel. Non-responding nodes get removed from the Smap and
// the resulting map is then metasync-ed.
// keep-alive nodes in parallel; nodes that fail to respond get removed from the cluster map (Smap)
// (see 'maintenance-mode' comment below)
func (pkr *palive) updateSmap(config *cmn.Config) (stopped bool) {
var (
p = pkr.p
Expand All @@ -277,7 +277,7 @@ func (pkr *palive) updateSmap(config *cmn.Config) (stopped bool) {

// direct call first
started := mono.NanoTime()
if _, _, err := pkr.p.reqHealth(si, config.Timeout.CplaneOperation.D(), nil, smap); err == nil {
if _, _, err := pkr.p.reqHealth(si, config.Timeout.CplaneOperation.D(), nil, smap, false /*retry pub-addr*/); err == nil {
now := mono.NanoTime()
pkr.statsT.Add(stats.KeepAliveLatency, now-started)
pkr.hb.HeardFrom(si.ID(), now) // effectively, yes
Expand Down Expand Up @@ -310,6 +310,7 @@ func (pkr *palive) updateSmap(config *cmn.Config) (stopped bool) {
return
}

// "slow-ping"
func (pkr *palive) goping(si *meta.Snode, wg cos.WG, smap *smapX, config *cmn.Config) {
if len(pkr.stoppedCh) > 0 {
wg.Done()
Expand All @@ -327,23 +328,23 @@ func (pkr *palive) goping(si *meta.Snode, wg cos.WG, smap *smapX, config *cmn.Co

func (pkr *palive) _pingRetry(si *meta.Snode, smap *smapX, config *cmn.Config) (ok, stopped bool) {
var (
timeout = config.Timeout.CplaneOperation.D()
tout = config.Timeout.CplaneOperation.D()
started = mono.NanoTime()
)
_, status, err := pkr.p.reqHealth(si, timeout, nil, smap)
_, status, err := pkr.p.reqHealth(si, tout, nil, smap, true /*retry via pub-addr, if different*/)
if err == nil {
now := mono.NanoTime()
pkr.statsT.Add(stats.KeepAliveLatency, now-started)
pkr.hb.HeardFrom(si.ID(), now) // effectively, yes
return true, false
}

tout := config.Timeout.MaxKeepalive.String()
tout = config.Timeout.MaxKeepalive.D()
nlog.Warningln("failed to slow-ping", si.StringEx(), "- retrying [", err, status, tout, smap.StringEx(), "]")
pkr.statsT.IncErr(stats.ErrKaliveCount)

ticker := time.NewTicker(cmn.KeepaliveRetryDuration(config))
ok, stopped = pkr.retry(si, ticker, config.Timeout.MaxKeepalive.D())
ok, stopped = pkr.retry(si, ticker, tout)
ticker.Stop()

return ok, stopped
Expand Down Expand Up @@ -411,7 +412,7 @@ func (pkr *palive) _final(ctx *smapModifier, clone *smapX) {
_ = pkr.p.metasyncer.sync(revsPair{clone, msg})
}

func (pkr *palive) retry(si *meta.Snode, ticker *time.Ticker, timeout time.Duration) (ok, stopped bool) {
func (pkr *palive) retry(si *meta.Snode, ticker *time.Ticker, tout time.Duration) (ok, stopped bool) {
var i int
for {
if !pkr.timeToPing(si.ID()) {
Expand All @@ -426,8 +427,7 @@ func (pkr *palive) retry(si *meta.Snode, ticker *time.Ticker, timeout time.Durat
started = mono.NanoTime()
smap = pkr.p.owner.smap.get()
)
// retry via pub-addr if different (compare with slowKalive)
_, status, err := pkr.p.reqHealth(si, timeout, nil, smap, 1 /*retry*/)
_, status, err := pkr.p.reqHealth(si, tout, nil, smap, true /*retry via pub-addr, if different*/)
if err == nil {
now := mono.NanoTime()
pkr.statsT.Add(stats.KeepAliveLatency, now-started)
Expand Down Expand Up @@ -561,14 +561,16 @@ func (k *keepalive) configUpdate(cfg *cmn.KeepaliveTrackerConf) {
func (k *keepalive) do(smap *smapX, si *meta.Snode, config *cmn.Config) (stopped bool) {
var (
pid = smap.Primary.ID()
timeout = config.Timeout.CplaneOperation.D()
pname = meta.Pname(pid)
tout = config.Timeout.CplaneOperation.D()
started = mono.NanoTime()
sname = si.String()
)
if nlog.Stopping() {
return
}
fast := k.k.cluUptime(started) > max(k.interval<<2, config.Timeout.Startup.D()>>1)
cpid, status, err := k.k.sendKalive(smap, timeout, started, fast)
cpid, status, err := k.k.sendKalive(smap, tout, started, fast)
if err == nil {
now := mono.NanoTime()
k.statsT.Add(stats.KeepAliveLatency, now-started)
Expand All @@ -580,9 +582,9 @@ func (k *keepalive) do(smap *smapX, si *meta.Snode, config *cmn.Config) (stopped

debug.Assert(cpid == pid && cpid != si.ID(), pid+", "+cpid+", "+si.ID())
if status != 0 {
nlog.Warningln(si.String(), "=>", meta.Pname(pid), "keepalive failed: [", err, status, "]")
nlog.Warningln(sname, "=>", pname, "keepalive failed: [", err, status, "]")
} else {
nlog.Warningln(si.String(), "=>", meta.Pname(pid), "keepalive failed:", err)
nlog.Warningln(sname, "=>", pname, "keepalive failed:", err)
}

//
Expand All @@ -600,22 +602,27 @@ func (k *keepalive) do(smap *smapX, si *meta.Snode, config *cmn.Config) (stopped
// and therefore not skipping keepalive req (compare with palive.retry)
i++
started := mono.NanoTime()
pid, status, err = k.k.sendKalive(nil, timeout, started, false /*fast*/)
pid, status, err = k.k.sendKalive(nil, tout, started, false /*fast*/)
if pid == si.ID() {
return // elected as primary
}
pname = meta.Pname(pid)
if err == nil {
now := mono.NanoTime()
k.statsT.Add(stats.KeepAliveLatency, now-started)
k.hb.HeardFrom(pid, now) // effectively, yes
nlog.Infof("%s: OK after %d attempt%s", si, i, cos.Plural(i))
if i == 1 {
nlog.Infoln(sname, "=>", pname, "OK after 1 attempt")
} else {
nlog.Infoln(sname, "=>", pname, "OK after", i, "attempts")
}
return
}
// repeat up to `kaNumRetries` with the max timeout
timeout = config.Timeout.MaxKeepalive.D()
// repeat up to `kaNumRetries` times with max-keepalive timeout
tout = config.Timeout.MaxKeepalive.D()

if i == kaNumRetries {
nlog.Warningf("%s: failed %d attempts => %s (primary)", si, i, meta.Pname(pid))
nlog.Warningln(sname, "=>", pname, "failed after", i, "attempts")
return true
}
if cos.IsUnreachable(err, status) {
Expand All @@ -624,7 +631,7 @@ func (k *keepalive) do(smap *smapX, si *meta.Snode, config *cmn.Config) (stopped
if nlog.Stopping() {
return true
}
err = fmt.Errorf("%s: unexpected response from %s: %v(%d)", si, meta.Pname(pid), err, status)
err = fmt.Errorf("%s: unexpected response from %s: %v(%d)", sname, pname, err, status)
debug.AssertNoErr(err)
nlog.Warningln(err)
case sig := <-k.controlCh:
Expand Down
27 changes: 19 additions & 8 deletions ais/prxclu.go
Original file line number Diff line number Diff line change
Expand Up @@ -1597,6 +1597,7 @@ func (p *proxy) _rebPostRm(ctx *smapModifier, clone *smapX) {
}

func (p *proxy) stopMaintenance(w http.ResponseWriter, r *http.Request, msg *apc.ActMsg) {
const tag = "stop-maintenance:"
var (
opts apc.ActValRmNode
smap = p.owner.smap.get()
Expand All @@ -1612,31 +1613,41 @@ func (p *proxy) stopMaintenance(w http.ResponseWriter, r *http.Request, msg *apc
return
}

nlog.Infof("%s: %s(%s) opts=%v", p, msg.Action, si.StringEx(), opts)
sname := si.StringEx()
pname := p.String()
nlog.Infoln(tag, pname, "[", msg.Action, sname, opts, "]")

if !smap.InMaint(si) {
p.writeErrf(w, r, "node %s is not in maintenance mode - nothing to do", si.StringEx())
return
}
timeout := cmn.GCO.Get().Timeout.CplaneOperation.D()
if _, status, err := p.reqHealth(si, timeout, nil, smap); err != nil {
sleep, retries := timeout/2, 5
tout := cmn.Rom.CplaneOperation()
if _, status, err := p.reqHealth(si, tout, nil, smap, false /*retry pub-addr*/); err != nil {
// TODO -- FIXME: use cmn.KeepaliveRetryDuration()
sleep, retries := tout/2, 4

time.Sleep(sleep)
for range retries { // retry
for i := range retries { // retry
time.Sleep(sleep)
_, status, err = p.reqHealth(si, timeout, nil, smap)
_, status, err = p.reqHealth(si, tout, nil, smap, true /*retry pub-addr*/)
if err == nil {
if i == 1 {
nlog.Infoln(tag, pname, "=>", sname, "OK after 1 attempt [", msg.Action, opts, "]")
} else {
nlog.Infoln(tag, pname, "=>", sname, "OK after", i, "attempts [", msg.Action, opts, "]")
}
break
}
if status != http.StatusServiceUnavailable {
p.writeErrf(w, r, "%s is unreachable: %v(%d)", si, err, status)
p.writeErrf(w, r, "%s is unreachable: %v(%d)", sname, err, status)
return
}
sleep = min(sleep+time.Second, tout)
}
if err != nil {
debug.Assert(status == http.StatusServiceUnavailable)
nlog.Errorf("%s: node %s takes unusually long time to start: %v(%d) - proceeding anyway",
p.si, si, err, status)
pname, sname, err, status)
}
}

Expand Down
2 changes: 1 addition & 1 deletion ais/tgtimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (*target) GetAllRunning(inout *core.AllRunningInOut, periodic bool) {
}

func (t *target) Health(si *meta.Snode, timeout time.Duration, query url.Values) ([]byte, int, error) {
return t.reqHealth(si, timeout, query, t.owner.smap.get())
return t.reqHealth(si, timeout, query, t.owner.smap.get(), false /*retry*/)
}

func (t *target) Backend(bck *meta.Bck) core.Backend {
Expand Down
10 changes: 3 additions & 7 deletions ais/vote.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,7 @@ func (p *proxy) elect(vr *VoteRecord, xele *xs.Election) {
curName = curPrimary.StringEx()
pname = p.String()
pnameC = pname + ":"
config = cmn.GCO.Get()
timeout = config.Timeout.CplaneOperation.D() / 2
tout = cmn.Rom.CplaneOperation()
)
// 1. ping the current primary (not using `apc.QparamAskPrimary` as the latter might be transitioning)
for i := range retryCurPrimary {
Expand All @@ -243,19 +242,16 @@ func (p *proxy) elect(vr *VoteRecord, xele *xs.Election) {
return
}

// retry via pub-addr if different (compare with palive.retry)
_, _, err = p.reqHealth(curPrimary, timeout, nil /*ask primary*/, smap, 1 /*retry*/)
_, _, err = p.reqHealth(curPrimary, tout, nil /*ask primary*/, smap, true /*retry via pub-addr, if different*/)
if err == nil {
break
}
timeout = config.Timeout.CplaneOperation.D()
}
if err == nil {
// move back to idle
query := url.Values{apc.QparamAskPrimary: []string{"true"}}

// retry via pub-addr if different (compare with palive.retry)
_, _, err = p.reqHealth(curPrimary, timeout, query /*ask primary*/, smap, 1 /*retry*/)
_, _, err = p.reqHealth(curPrimary, tout, query /*ask primary*/, smap, true /* ditto */)

if err == nil {
nlog.Infoln(pnameC, "the current primary", curName, "is up, moving back to idle")
Expand Down
17 changes: 9 additions & 8 deletions cmd/cli/cli/log_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ import (
const clusterCompletion = "cluster"

// (compare with getCluLogUsage)
const getLogUsage = "download the current log or entire log history from a selected node or all nodes, e.g.:\n" +
indent4 + "\t - 'ais log get NODE_ID /tmp' - download the specified node's current log; save the result to the specified directory;\n" +
indent4 + "\t - 'ais log get NODE_ID /tmp/out --refresh 10' - download the current log as /tmp/out\n" +
indent4 + "\t keep updating (ie., appending) the latter every 10s;\n" +
indent4 + "\t - 'ais log get cluster /tmp' - download TAR.GZ archived logs from _all_ nodes in the cluster\n" +
indent4 + "\t (note that 'cluster' implies '--all'), and save the result to the specified destination;\n" +
indent4 + "\t - 'ais log get NODE_ID --all' - download the node's TAR.GZ log archive\n" +
indent4 + "\t - 'ais log get NODE_ID --all --severity e' - TAR.GZ archive of (only) logged errors and warnings"
const getLogUsage = "download the current log or entire log history from: a) selected node, or b) entire cluster,\n" +
indent1 + "\t e.g.:\n" +
indent1 + "\t - 'ais log get NODE_ID /tmp'\t- download the specified node's current log and save it in the specified directory;\n" +
indent1 + "\t - 'ais log get NODE_ID /tmp/out --refresh 10'\t- download the node's current log _as_ /tmp/out\n" +
indent1 + "\t \t and keep updating (ie., appending) the latter every 10s;\n" +
indent1 + "\t - 'ais log get cluster /tmp'\t-\tdownload TAR.GZ archived logs of _all_ nodes in the cluster\n" +
indent1 + "\t \t and save the result in the specified local directory (note that 'get cluster' implies '--all');\n" +
indent1 + "\t - 'ais log get NODE_ID --all'\t- given 'NODE-ID' download the node's entire log TAR.GZ archive\n" +
indent1 + "\t - 'ais log get NODE_ID --all --severity e'\t- archive logged errors and warnings"

var (
nodeLogFlags = map[string][]cli.Flag{
Expand Down
2 changes: 1 addition & 1 deletion cmn/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1216,7 +1216,7 @@ func (c *KeepaliveConf) Validate() (err error) {

func KeepaliveRetryDuration(c *Config) time.Duration {
d := c.Timeout.CplaneOperation.D() * time.Duration(c.Keepalive.RetryFactor)
return min(d, c.Timeout.MaxKeepalive.D()+time.Second/2)
return min(d, c.Timeout.MaxKeepalive.D()+time.Second)
}

/////////////
Expand Down

0 comments on commit 89d06db

Please sign in to comment.