Skip to content

Commit

Permalink
new RMD _not_ to trigger rebalance when disabled in the config
Browse files Browse the repository at this point in the history
* several distinct scenarios: by user, RMD with action message,
  RMD without message (ref)
* extra check in the latter case
* still trusting local copy of the cluster config, though

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Sep 19, 2024
1 parent 3eaf228 commit 550cade
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 37 deletions.
117 changes: 81 additions & 36 deletions ais/tgtcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http"
"net/url"
"path/filepath"
"runtime"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -891,42 +892,7 @@ func (t *target) receiveRMD(newRMD *rebMD, msg *aisMsg) (err error) {
}
}
if !t.regstate.disabled.Load() {
//
// run rebalance
//
notif := &xact.NotifXact{
Base: nl.Base{When: core.UponTerm, Dsts: []string{equalIC}, F: t.notifyTerm},
}
if msg.Action == apc.ActRebalance {
nlog.Infof("%s: starting user-requested rebalance[%s]", t, msg.UUID)
go t.reb.RunRebalance(&smap.Smap, newRMD.Version, notif, t.statsT)
return
}

switch msg.Action {
case apc.ActStartMaintenance, apc.ActDecommissionNode, apc.ActShutdownNode, apc.ActRmNodeUnsafe:
var opts apc.ActValRmNode
if err := cos.MorphMarshal(msg.Value, &opts); err != nil {
debug.AssertNoErr(err) // unlikely
} else {
var s string
if opts.DaemonID == t.SID() {
s = " (to subsequently deactivate or remove _this_ target)"
}
nlog.Infof("%s: starting '%s' triggered rebalance[%s]%s: %+v",
t, msg.Action, xact.RebID2S(newRMD.Version), s, opts)
}
default:
nlog.Infoln(t.String() + ": starting rebalance[" + xact.RebID2S(newRMD.Version) + "]")
}
go t.reb.RunRebalance(&smap.Smap, newRMD.Version, notif, t.statsT)

if newRMD.Resilver != "" {
nlog.Infoln(t.String() + ": ... and resilver")
go t.runResilver(res.Args{UUID: newRMD.Resilver, SkipGlobMisplaced: true}, nil /*wg*/)
}
t.owner.rmd.put(newRMD)
// TODO: move and refactor
t._runRe(newRMD, msg, smap)
} else if msg.Action == apc.ActAdminJoinTarget && daemon.cli.target.standby && msg.Name == t.SID() {
nlog.Warningln(t.String()+": standby => join", msg.String())
if _, err = t.joinCluster(msg.Action); err == nil {
Expand All @@ -937,6 +903,85 @@ func (t *target) receiveRMD(newRMD *rebMD, msg *aisMsg) (err error) {
return
}

func (t *target) _runRe(newRMD *rebMD, msg *aisMsg, smap *smapX) {
const tag = "rebalance["
notif := &xact.NotifXact{
Base: nl.Base{When: core.UponTerm, Dsts: []string{equalIC}, F: t.notifyTerm},
}

// 1. by user
if msg.Action == apc.ActRebalance {
xname := tag + msg.UUID + "]"
nlog.Infoln(t.String(), "starting user-requested", t, xname)

// (##a)
go t.reb.RunRebalance(&smap.Smap, newRMD.Version, notif, t.statsT)
return
}

// 2. by RMD
xname := tag + xact.RebID2S(newRMD.Version) + "]"

switch msg.Action {
// 2.1. action => metasync(newRMD)
case apc.ActStartMaintenance, apc.ActDecommissionNode, apc.ActShutdownNode, apc.ActRmNodeUnsafe:
var opts apc.ActValRmNode
if err := cos.MorphMarshal(msg.Value, &opts); err != nil {
debug.AssertNoErr(err) // unlikely
} else {
var s string
if opts.DaemonID == t.SID() {
s = " (to subsequently deactivate or remove _this_ target)"
}
nlog.Infof("%s: starting '%s' triggered %s%s: %+v", t, msg.Action, xname, s, opts)
}
// (##b)
go t.reb.RunRebalance(&smap.Smap, newRMD.Version, notif, t.statsT)

// 2.2. "pure" metasync(newRMD) w/ no action - double-check with cluster config
default:
config := cmn.GCO.Get()
debug.Assert(config.Version > 0 && config.UUID == smap.UUID, config.String(), " vs ", smap.StringEx())

if config.Rebalance.Enabled {
nlog.Infoln(t.String(), "starting", xname)
// (##c)
go t.reb.RunRebalance(&smap.Smap, newRMD.Version, notif, t.statsT)
} else {
runtime.Gosched()

go func() {
smap := t.owner.smap.get()
if smap.GetNode(t.SID()) == nil {
nlog.Errorf(fmtSelfNotPresent, t, smap.StringEx())
return
}
config := cmn.GCO.Get()
if !config.Rebalance.Enabled {
//
// NOTE: trusting local copy of the config, _not_ checking with primary via reqHealth()
//
nlog.Warningln(t.String(), "not starting", xname, "- disabled in the", config.String())
return
}

// (##d)
nlog.Infoln(t.String(), "starting", xname)
t.reb.RunRebalance(&smap.Smap, newRMD.Version, notif, t.statsT)
}()
}
}

if newRMD.Resilver != "" {
nlog.Infoln(t.String(), "... and resilver")

// (##resilver)
go t.runResilver(res.Args{UUID: newRMD.Resilver, SkipGlobMisplaced: true}, nil /*wg*/)
}

t.owner.rmd.put(newRMD)
}

func (t *target) ensureLatestBMD(msg *aisMsg, r *http.Request) {
bmd, bmdVersion := t.owner.bmd.Get(), msg.BMDVersion
if bmd.Version < bmdVersion {
Expand Down
2 changes: 1 addition & 1 deletion core/meta/rmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package meta

type (
// RMD (Rebalance MetaData)
// Rebalance MetaData
RMD struct {
Ext any `json:"ext,omitempty"` // within meta-version extensions
CluID string `json:"cluster_id"` // effectively, Smap.UUID
Expand Down

0 comments on commit 550cade

Please sign in to comment.