Skip to content

Commit

Permalink
filesystem health checker (fshc) version 2
Browse files Browse the repository at this point in the history
* add 'err-mountpath-changed-at-runtime'
  - list-objects will now detect it and trigger FSHC
  - TODO: consider making the check inside `Get()` and `GetAvail()`
* part eight, prev. commit: 9ba4f97

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jul 15, 2024
1 parent 1fe096c commit e0a312c
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 15 deletions.
2 changes: 1 addition & 1 deletion cmn/cos/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func NewCallbackReadOpenCloser(r ReadOpenCloser, readCb func(int, error), report

func (r *CallbackROC) Read(p []byte) (n int, err error) {
n, err = r.roc.Read(p)
debug.Assert(r.readBytes > math.MaxInt-n)
debug.Assert(r.readBytes < math.MaxInt-n)
r.readBytes += n
if r.readBytes > r.reportedBytes {
diff := r.readBytes - r.reportedBytes
Expand Down
18 changes: 18 additions & 0 deletions cmn/err.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ type (
disks []string
fsdisks []string
}
ErrMountpathChangeRT struct {
err error
}

ErrInvalidFSPathsConf struct {
err error
Expand Down Expand Up @@ -583,6 +586,21 @@ func IsErrMountpathNewDisk(err error) bool {
return ok
}

// ErrMountpathChangeRT

func NewErrMountpathChangeRT(err error) *ErrMountpathChangeRT {
return &ErrMountpathChangeRT{err: err}
}

func (e *ErrMountpathChangeRT) Error() string {
return e.err.Error()
}

func IsErrMountpathChangeRT(err error) bool {
_, ok := err.(*ErrMountpathChangeRT)
return ok
}

// ErrInvalidFSPathsConf

func NewErrInvalidFSPathsConf(err error) *ErrInvalidFSPathsConf {
Expand Down
13 changes: 12 additions & 1 deletion fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,21 @@ func NewMountpath(mpath string, label ios.Label) (*Mountpath, error) {
Label: label,
PathDigest: xxhash.Checksum64S(cos.UnsafeB(cleanMpath), cos.MLCG32),
}
err = mi.ResolveFS()
err = mi.resolveFS()
return mi, err
}

func (mi *Mountpath) CheckFS() (err error) {
dup := Mountpath{Path: mi.Path}
err = dup.resolveFS()
if err == nil {
if !dup.FS.Equal(mi.FS) {
err = fmt.Errorf("%s: detected filesystem change at runtime: %s => %s ", mi, mi.FS.String(), dup.FS.String())
}
}
return err
}

// flags
func (mi *Mountpath) SetFlags(flags uint64) (ok bool) {
return cos.SetfAtomic(&mi.flags, flags)
Expand Down
2 changes: 1 addition & 1 deletion fs/fs_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"syscall"
)

func (mi *Mountpath) ResolveFS() error {
func (mi *Mountpath) resolveFS() error {
var fsStats syscall.Statfs_t
if err := syscall.Statfs(mi.Path, &fsStats); err != nil {
return fmt.Errorf("cannot statfs fspath %q, err: %w", mi.Path, err)
Expand Down
2 changes: 1 addition & 1 deletion fs/fs_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"syscall"
)

func (mi *Mountpath) ResolveFS() error {
func (mi *Mountpath) resolveFS() error {
var fsStats syscall.Statfs_t
if err := syscall.Statfs(mi.Path, &fsStats); err != nil {
return fmt.Errorf("cannot statfs fspath %q, err: %w", mi.Path, err)
Expand Down
9 changes: 1 addition & 8 deletions fs/health/fshc.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ func (f *FSHC) run(mi *fs.Mountpath, fqn string) {
serr string
rerrs, werrs int
cfg = cmn.GCO.Get().FSHC
dup = fs.Mountpath{Path: mi.Path}
)
// 1. fstat
err := cos.Stat(mi.Path)
Expand All @@ -65,13 +64,7 @@ func (f *FSHC) run(mi *fs.Mountpath, fqn string) {
}

// 2. resolve FS
err = dup.ResolveFS()
if err == nil {
if !dup.FS.Equal(mi.FS) {
err = fmt.Errorf("%s: detected filesystem change (%s => %s) at runtime", mi, mi.FS.String(), dup.FS.String())
}
}
if err != nil {
if err = mi.CheckFS(); err != nil {
nlog.Errorln(err)
goto disable
}
Expand Down
2 changes: 1 addition & 1 deletion fs/health/serialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type ror struct {
var all sync.Map // [mpath => ror]

func (*FSHC) IsErr(err error) bool {
return cmn.IsErrGetCap(err) || cos.IsIOError(err)
return cmn.IsErrGetCap(err) || cmn.IsErrMountpathChangeRT(err) || cos.IsIOError(err)
}

func (f *FSHC) OnErr(mi *fs.Mountpath, fqn string) {
Expand Down
11 changes: 9 additions & 2 deletions fs/walkbck.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/cmn/nlog"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -43,6 +44,7 @@ type (
wbeHeap []wbeInfo
)

// lso and tests
func WalkBck(opts *WalkBckOpts) error {
debug.Assert(opts.Mi == nil && opts.Sorted) // TODO: support `opts.Sorted == false`
var (
Expand Down Expand Up @@ -100,9 +102,14 @@ func WalkBck(opts *WalkBckOpts) error {
///////////////

func (jg *joggerBck) walk() (err error) {
err = Walk(&jg.opts)
if err = jg.opts.Mi.CheckFS(); err != nil {
nlog.Errorln(err)
mfs.hc.FSHC(cmn.NewErrMountpathChangeRT(err), jg.opts.Mi, "")
} else {
err = Walk(&jg.opts)
}
close(jg.workCh)
return
return err
}

func (jg *joggerBck) cb(fqn string, de DirEntry) error {
Expand Down

0 comments on commit e0a312c

Please sign in to comment.