Skip to content

Commit

Permalink
filesystem health checker (fshc) version 2
Browse files Browse the repository at this point in the history
* add disk-fault alert
* `<DISK NAME>[<alert>]` convention, with suffix enumeration in fs/api.go
* part two

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jul 3, 2024
1 parent 252f952 commit 0319347
Show file tree
Hide file tree
Showing 15 changed files with 135 additions and 35 deletions.
4 changes: 2 additions & 2 deletions ais/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,7 @@ func (t *target) httpobjput(w http.ResponseWriter, r *http.Request, apireq *apiR
}
cs := fs.Cap()
if errCap := cs.Err(); errCap != nil || cs.PctMax > int32(config.Space.CleanupWM) {
cs = t.OOS(nil)
cs = t.oos(config)
if cs.IsOOS() {
// fail this write
t.writeErr(w, r, errCap, http.StatusInsufficientStorage)
Expand Down Expand Up @@ -1440,7 +1440,7 @@ func (t *target) blobdl(params *core.BlobParams, oa *cmn.ObjAttrs) (string, *xs.
// cap
cs := fs.Cap()
if errCap := cs.Err(); errCap != nil {
cs = t.OOS(nil)
cs = t.oos(nil)
if err := cs.Err(); err != nil {
return "", nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion ais/tgtcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ func (t *target) _postBMD(newBMD *bucketMD, tag string, rmbcks []*meta.Bck) {
// since some buckets may have been destroyed
cs := fs.Cap()
if cs.Err() != nil {
_ = t.OOS(nil)
_ = t.oos(nil)
}
}

Expand Down
9 changes: 5 additions & 4 deletions ais/tgtfshc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import (
)

func (t *target) FSHC(err error, mi *fs.Mountpath, fqn string) {
if !cmn.GCO.Get().FSHC.Enabled {
config := cmn.GCO.Get()
if !config.FSHC.Enabled {
return
}
if !cos.IsIOError(err) { // TODO -- FIXME: review the selection
debug.Assert(!cos.IsErrOOS(err)) // is checked below
return
}

Expand All @@ -44,7 +46,7 @@ func (t *target) FSHC(err error, mi *fs.Mountpath, fqn string) {
}

if cos.IsErrOOS(err) {
cs := t.OOS(nil)
cs := t.oos(config)
nlog.Errorf("%s: OOS (%s), not %s", t, cs.String(), s)
return
}
Expand All @@ -67,7 +69,6 @@ func (t *target) FSHC(err error, mi *fs.Mountpath, fqn string) {
func (t *target) DisableMpath(mi *fs.Mountpath) (err error) {
_, err = t.fsprg.disableMpath(mi.Path, true /*dont-resilver*/)

// TODO -- FIXME: Prometheus alert

t.statsT.Flag(stats.NodeStateFlags, cos.DiskFault, 0)
return err
}
9 changes: 6 additions & 3 deletions ais/tgtspace.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Package ais provides core functionality for the AIStore object storage.
/*
* Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.
*/
package ais

Expand Down Expand Up @@ -36,14 +36,17 @@ var (
)

// triggers by an out-of-space condition or a suspicion of thereof
func (t *target) OOS(csRefreshed *fs.CapStatus) (cs fs.CapStatus) {

func (t *target) oos(config *cmn.Config) fs.CapStatus { return t.OOS(nil, config, nil) }

func (t *target) OOS(csRefreshed *fs.CapStatus, config *cmn.Config, tcdf *fs.TargetCDF) (cs fs.CapStatus) {
var errCap error
if csRefreshed != nil {
cs = *csRefreshed
errCap = cs.Err()
} else {
var err error
cs, err, errCap = fs.CapRefresh(nil, nil)
cs, err, errCap = fs.CapRefresh(config, tcdf)
if err != nil {
nlog.Errorln(t.String(), "failed to update capacity stats:", err)
return
Expand Down
2 changes: 1 addition & 1 deletion ais/tgttxn.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (t *target) txnHandler(w http.ResponseWriter, r *http.Request) {
t.transactions.find(c.uuid, ActCleanup)

if cmn.IsErrCapExceeded(err) {
cs := t.OOS(nil)
cs := t.oos(nil)
t.writeErrStatusf(w, r, http.StatusInsufficientStorage, "%s: %v", cs.String(), err)
} else {
t.writeErr(w, r, err)
Expand Down
1 change: 1 addition & 0 deletions cmn/cos/bitflags.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func (f BitFlags) IsAnySet(flags BitFlags) bool {

// atomic

// "set" as in: "add"
func SetfAtomic(f *uint64, flags uint64) (ok bool) {
return atomic.CompareAndSwapUint64(f, *f, *f|flags)
}
Expand Down
2 changes: 1 addition & 1 deletion cmn/cos/node_state_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const (
MaintenanceMode // warning
LowCapacity // (used > high); warning: OOS possible soon..
LowMemory // ditto OOM
DiskFault // red & critical // TODO -- FIXME: NIY
DiskFault // red
)

func (f NodeStateFlags) IsSet(flag NodeStateFlags) bool { return BitFlags(f).IsSet(BitFlags(flag)) }
Expand Down
2 changes: 1 addition & 1 deletion cmn/err.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ func (e *ErrCapExceeded) Error() string {

func IsErrCapExceeded(err error) bool {
_, ok := err.(*ErrCapExceeded)
return ok
return ok || cos.IsErrOOS(err)
}

// ErrInvalidCksum
Expand Down
5 changes: 4 additions & 1 deletion core/mock/target_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ func (*TargetMock) Backend(*meta.Bck) core.Backend
func (*TargetMock) HeadObjT2T(*core.LOM, *meta.Snode) bool { return false }
func (*TargetMock) BMDVersionFixup(*http.Request, ...cmn.Bck) {}
func (*TargetMock) FSHC(error, *fs.Mountpath, string) {}
func (*TargetMock) OOS(*fs.CapStatus) fs.CapStatus { return fs.CapStatus{} }

func (*TargetMock) OOS(*fs.CapStatus, *cmn.Config, *fs.TargetCDF) fs.CapStatus {
return fs.CapStatus{}
}

func (*TargetMock) CopyObject(*core.LOM, core.DM, *core.CopyParams) (int64, error) {
return 0, nil
Expand Down
2 changes: 1 addition & 1 deletion core/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type (
Node

// Space
OOS(*fs.CapStatus) fs.CapStatus
OOS(*fs.CapStatus, *cmn.Config, *fs.TargetCDF) fs.CapStatus

// xactions (jobs) now
GetAllRunning(inout *AllRunningInOut, periodic bool)
Expand Down
47 changes: 45 additions & 2 deletions fs/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,23 @@
package fs

import (
"strings"

"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/ios"
)

// disk name suffix (see HasAlert below)
const (
DiskFaulted = "(faulted)" // disabled by FSHC
DiskOOS = "(out-of-space)" // (capacity)
DiskDisable = "(->disabled)" // FlagBeingDisabled (in transition)
DiskDetach = "(->detach)" // FlagBeingDetached (ditto)
DiskHighWM = "(low-on-space)" // (capacity)

// NOTE: when adding/updating see "must contain" below
)

type (
Capacity struct {
Used uint64 `json:"used,string"` // bytes
Expand All @@ -18,7 +31,7 @@ type (
// Capacity, Disks, Filesystem (CDF)
CDF struct {
Capacity
Disks []string `json:"disks"` // owned or shared disks (ios.FsDisks map => slice)
Disks []string `json:"disks"` // owned or shared disks (ios.FsDisks map => slice); "name[.faulted | degraded]"
Label ios.Label `json:"mountpath_label"`
FS cos.FS `json:"fs"`
}
Expand All @@ -30,7 +43,7 @@ type (
PctMax int32 `json:"pct_max"` // max used (%)
PctAvg int32 `json:"pct_avg"` // avg used (%)
PctMin int32 `json:"pct_min"` // min used (%)
CsErr string `json:"cs_err"` // OOS or high-wm error message
CsErr string `json:"cs_err"` // OOS or high-wm error message; disk fault
}
)

Expand All @@ -57,3 +70,33 @@ func InitCDF(tcdf *TargetCDF) {
tcdf.Mountpaths[mpath] = &CDF{}
}
}

func (tcdf *TargetCDF) HasAlerts() bool {
for _, cdf := range tcdf.Mountpaths {
if alert, _ := cdf.HasAlert(); alert != "" {
return true
}
}
return false
}

// Returns "" and (-1) when no alerts found;
// otherwise, returns alert name and its index in the string, which is formatted as follows:
// <DISK-NAME>[<ALERT-NAME>]
func (cdf *CDF) HasAlert() (alert string, idx int) {
var alerts = []string{DiskFaulted, DiskOOS, DiskDisable, DiskDetach, DiskHighWM} // NOTE: must contain all flags
for _, disk := range cdf.Disks {
for _, a := range alerts {
if idx = strings.Index(disk, a); idx > 0 {
return disk[idx:], idx
}
}
}
return "", -1
}

func (cdf *CDF) _alert(a string) {
for i, d := range cdf.Disks {
cdf.Disks[i] = d + a
}
}
21 changes: 18 additions & 3 deletions fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func NewMountpath(mpath string, label ios.Label) (*Mountpath, error) {
}

// flags
func (mi *Mountpath) setFlags(flags uint64) (ok bool) {
func (mi *Mountpath) SetFlags(flags uint64) (ok bool) {
return cos.SetfAtomic(&mi.flags, flags)
}

Expand Down Expand Up @@ -365,7 +365,7 @@ func (mi *Mountpath) AddEnabled(tid string, avail MPI, config *cmn.Config) (err
if err = mi._addEnabled(tid, avail, config); err == nil {
mfs.fsIDs[mi.FsID] = mi.Path
}
cos.ClearfAtomic(&mi.flags, FlagWaitingDD)
cos.ClearfAtomic(&mi.flags, FlagWaitingDD|FlagDisabledByFSHC)
return
}

Expand Down Expand Up @@ -779,7 +779,7 @@ func begdd(action string, flags uint64, mpath string) (mi *Mountpath, numAvail i
// dd active
clone := _cloneOne(avail)
mi = clone[mpath]
ok := mi.setFlags(flags)
ok := mi.SetFlags(flags)
debug.Assert(ok, mi.String()) // under lock
putAvailMPI(clone)
numAvail = len(clone) - 1
Expand Down Expand Up @@ -1150,6 +1150,21 @@ func CapRefresh(config *cmn.Config, tcdf *TargetCDF) (cs CapStatus, _, errCap er
if tcdf != nil {
cdf := mi._cdf(tcdf)
cdf.Capacity = c

// add alerts
// (not mutually exclusive, but we add only one here in order of priority)
switch {
case mi.IsAnySet(FlagDisabledByFSHC):
cdf._alert(DiskFaulted)
case c.PctUsed > int32(config.Space.OOS):
cdf._alert(DiskOOS)
case mi.IsAnySet(FlagBeingDetached):
cdf._alert(DiskDetach)
case mi.IsAnySet(FlagBeingDisabled):
cdf._alert(DiskDisable)
case c.PctUsed >= int32(config.Space.HighWM):
cdf._alert(DiskHighWM)
}
}

// recompute totals
Expand Down
1 change: 1 addition & 0 deletions fs/health/fshc.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (f *FSHC) run(mi *fs.Mountpath, fqn string) {
nlog.Errorf("%s: failed to disable, err: %v", mi, err)
} else {
nlog.Infoln(mi.String(), "now disabled")
mi.SetFlags(fs.FlagDisabledByFSHC)
}
}

Expand Down
5 changes: 5 additions & 0 deletions stats/common_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,11 @@ func (r *runner) Name() string { return r.name }

func (r *runner) Get(name string) (val int64) { return r.core.get(name) }

func (r *runner) nodeStateFlags() cos.NodeStateFlags {
val := r.Get(NodeStateFlags)
return cos.NodeStateFlags(val)
}

func (r *runner) _run(logger statsLogger /* Prunner or Trunner */) error {
var (
i, j, k time.Duration
Expand Down
Loading

0 comments on commit 0319347

Please sign in to comment.