From 550cade20b272cef5d6c12c748d3629beba8c8c2 Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Thu, 19 Sep 2024 09:26:49 -0400 Subject: [PATCH] new RMD _not_ to trigger rebalance when disabled in the config * several distinct scenarios: by user, RMD with action message, RMD without message (ref) * extra check in the latter case * still trusting local copy of the cluster config, though Signed-off-by: Alex Aizman --- ais/tgtcp.go | 117 ++++++++++++++++++++++++++++++++--------------- core/meta/rmd.go | 2 +- 2 files changed, 82 insertions(+), 37 deletions(-) diff --git a/ais/tgtcp.go b/ais/tgtcp.go index 4e3df5d2d47..7671553f100 100644 --- a/ais/tgtcp.go +++ b/ais/tgtcp.go @@ -10,6 +10,7 @@ import ( "net/http" "net/url" "path/filepath" + "runtime" "strconv" "sync" "time" @@ -891,42 +892,7 @@ func (t *target) receiveRMD(newRMD *rebMD, msg *aisMsg) (err error) { } } if !t.regstate.disabled.Load() { - // - // run rebalance - // - notif := &xact.NotifXact{ - Base: nl.Base{When: core.UponTerm, Dsts: []string{equalIC}, F: t.notifyTerm}, - } - if msg.Action == apc.ActRebalance { - nlog.Infof("%s: starting user-requested rebalance[%s]", t, msg.UUID) - go t.reb.RunRebalance(&smap.Smap, newRMD.Version, notif, t.statsT) - return - } - - switch msg.Action { - case apc.ActStartMaintenance, apc.ActDecommissionNode, apc.ActShutdownNode, apc.ActRmNodeUnsafe: - var opts apc.ActValRmNode - if err := cos.MorphMarshal(msg.Value, &opts); err != nil { - debug.AssertNoErr(err) // unlikely - } else { - var s string - if opts.DaemonID == t.SID() { - s = " (to subsequently deactivate or remove _this_ target)" - } - nlog.Infof("%s: starting '%s' triggered rebalance[%s]%s: %+v", - t, msg.Action, xact.RebID2S(newRMD.Version), s, opts) - } - default: - nlog.Infoln(t.String() + ": starting rebalance[" + xact.RebID2S(newRMD.Version) + "]") - } - go t.reb.RunRebalance(&smap.Smap, newRMD.Version, notif, t.statsT) - - if newRMD.Resilver != "" { - nlog.Infoln(t.String() + ": ... and resilver") - go t.runResilver(res.Args{UUID: newRMD.Resilver, SkipGlobMisplaced: true}, nil /*wg*/) - } - t.owner.rmd.put(newRMD) - // TODO: move and refactor + t._runRe(newRMD, msg, smap) } else if msg.Action == apc.ActAdminJoinTarget && daemon.cli.target.standby && msg.Name == t.SID() { nlog.Warningln(t.String()+": standby => join", msg.String()) if _, err = t.joinCluster(msg.Action); err == nil { @@ -937,6 +903,85 @@ func (t *target) receiveRMD(newRMD *rebMD, msg *aisMsg) (err error) { return } +func (t *target) _runRe(newRMD *rebMD, msg *aisMsg, smap *smapX) { + const tag = "rebalance[" + notif := &xact.NotifXact{ + Base: nl.Base{When: core.UponTerm, Dsts: []string{equalIC}, F: t.notifyTerm}, + } + + // 1. by user + if msg.Action == apc.ActRebalance { + xname := tag + msg.UUID + "]" + nlog.Infoln(t.String(), "starting user-requested", t, xname) + + // (##a) + go t.reb.RunRebalance(&smap.Smap, newRMD.Version, notif, t.statsT) + return + } + + // 2. by RMD + xname := tag + xact.RebID2S(newRMD.Version) + "]" + + switch msg.Action { + // 2.1. action => metasync(newRMD) + case apc.ActStartMaintenance, apc.ActDecommissionNode, apc.ActShutdownNode, apc.ActRmNodeUnsafe: + var opts apc.ActValRmNode + if err := cos.MorphMarshal(msg.Value, &opts); err != nil { + debug.AssertNoErr(err) // unlikely + } else { + var s string + if opts.DaemonID == t.SID() { + s = " (to subsequently deactivate or remove _this_ target)" + } + nlog.Infof("%s: starting '%s' triggered %s%s: %+v", t, msg.Action, xname, s, opts) + } + // (##b) + go t.reb.RunRebalance(&smap.Smap, newRMD.Version, notif, t.statsT) + + // 2.2. "pure" metasync(newRMD) w/ no action - double-check with cluster config + default: + config := cmn.GCO.Get() + debug.Assert(config.Version > 0 && config.UUID == smap.UUID, config.String(), " vs ", smap.StringEx()) + + if config.Rebalance.Enabled { + nlog.Infoln(t.String(), "starting", xname) + // (##c) + go t.reb.RunRebalance(&smap.Smap, newRMD.Version, notif, t.statsT) + } else { + runtime.Gosched() + + go func() { + smap := t.owner.smap.get() + if smap.GetNode(t.SID()) == nil { + nlog.Errorf(fmtSelfNotPresent, t, smap.StringEx()) + return + } + config := cmn.GCO.Get() + if !config.Rebalance.Enabled { + // + // NOTE: trusting local copy of the config, _not_ checking with primary via reqHealth() + // + nlog.Warningln(t.String(), "not starting", xname, "- disabled in the", config.String()) + return + } + + // (##d) + nlog.Infoln(t.String(), "starting", xname) + t.reb.RunRebalance(&smap.Smap, newRMD.Version, notif, t.statsT) + }() + } + } + + if newRMD.Resilver != "" { + nlog.Infoln(t.String(), "... and resilver") + + // (##resilver) + go t.runResilver(res.Args{UUID: newRMD.Resilver, SkipGlobMisplaced: true}, nil /*wg*/) + } + + t.owner.rmd.put(newRMD) +} + func (t *target) ensureLatestBMD(msg *aisMsg, r *http.Request) { bmd, bmdVersion := t.owner.bmd.Get(), msg.BMDVersion if bmd.Version < bmdVersion { diff --git a/core/meta/rmd.go b/core/meta/rmd.go index aedeec2614a..33151c69ead 100644 --- a/core/meta/rmd.go +++ b/core/meta/rmd.go @@ -5,7 +5,7 @@ package meta type ( - // RMD (Rebalance MetaData) + // Rebalance MetaData RMD struct { Ext any `json:"ext,omitempty"` // within meta-version extensions CluID string `json:"cluster_id"` // effectively, Smap.UUID