diff --git a/cmn/cos/io.go b/cmn/cos/io.go index 14124dea774..1e426973a34 100644 --- a/cmn/cos/io.go +++ b/cmn/cos/io.go @@ -283,7 +283,7 @@ func NewCallbackReadOpenCloser(r ReadOpenCloser, readCb func(int, error), report func (r *CallbackROC) Read(p []byte) (n int, err error) { n, err = r.roc.Read(p) - debug.Assert(r.readBytes > math.MaxInt-n) + debug.Assert(r.readBytes < math.MaxInt-n) r.readBytes += n if r.readBytes > r.reportedBytes { diff := r.readBytes - r.reportedBytes diff --git a/cmn/err.go b/cmn/err.go index 907fb77355e..35abf2cbd6b 100644 --- a/cmn/err.go +++ b/cmn/err.go @@ -150,6 +150,9 @@ type ( disks []string fsdisks []string } + ErrMountpathChangeRT struct { + err error + } ErrInvalidFSPathsConf struct { err error @@ -583,6 +586,21 @@ func IsErrMountpathNewDisk(err error) bool { return ok } +// ErrMountpathChangeRT + +func NewErrMountpathChangeRT(err error) *ErrMountpathChangeRT { + return &ErrMountpathChangeRT{err: err} +} + +func (e *ErrMountpathChangeRT) Error() string { + return e.err.Error() +} + +func IsErrMountpathChangeRT(err error) bool { + _, ok := err.(*ErrMountpathChangeRT) + return ok +} + // ErrInvalidFSPathsConf func NewErrInvalidFSPathsConf(err error) *ErrInvalidFSPathsConf { diff --git a/fs/fs.go b/fs/fs.go index c368d912b59..8a91ad527f8 100644 --- a/fs/fs.go +++ b/fs/fs.go @@ -119,10 +119,21 @@ func NewMountpath(mpath string, label ios.Label) (*Mountpath, error) { Label: label, PathDigest: xxhash.Checksum64S(cos.UnsafeB(cleanMpath), cos.MLCG32), } - err = mi.ResolveFS() + err = mi.resolveFS() return mi, err } +func (mi *Mountpath) CheckFS() (err error) { + dup := Mountpath{Path: mi.Path} + err = dup.resolveFS() + if err == nil { + if !dup.FS.Equal(mi.FS) { + err = fmt.Errorf("%s: detected filesystem change at runtime: %s => %s ", mi, mi.FS.String(), dup.FS.String()) + } + } + return err +} + // flags func (mi *Mountpath) SetFlags(flags uint64) (ok bool) { return cos.SetfAtomic(&mi.flags, flags) diff --git a/fs/fs_darwin.go b/fs/fs_darwin.go index c841500430e..7fd4530ef35 100644 --- a/fs/fs_darwin.go +++ b/fs/fs_darwin.go @@ -10,7 +10,7 @@ import ( "syscall" ) -func (mi *Mountpath) ResolveFS() error { +func (mi *Mountpath) resolveFS() error { var fsStats syscall.Statfs_t if err := syscall.Statfs(mi.Path, &fsStats); err != nil { return fmt.Errorf("cannot statfs fspath %q, err: %w", mi.Path, err) diff --git a/fs/fs_linux.go b/fs/fs_linux.go index f6adf91de64..6a4b406419a 100644 --- a/fs/fs_linux.go +++ b/fs/fs_linux.go @@ -13,7 +13,7 @@ import ( "syscall" ) -func (mi *Mountpath) ResolveFS() error { +func (mi *Mountpath) resolveFS() error { var fsStats syscall.Statfs_t if err := syscall.Statfs(mi.Path, &fsStats); err != nil { return fmt.Errorf("cannot statfs fspath %q, err: %w", mi.Path, err) diff --git a/fs/health/fshc.go b/fs/health/fshc.go index 5f6190057ed..d4ffe227bf4 100644 --- a/fs/health/fshc.go +++ b/fs/health/fshc.go @@ -51,7 +51,6 @@ func (f *FSHC) run(mi *fs.Mountpath, fqn string) { serr string rerrs, werrs int cfg = cmn.GCO.Get().FSHC - dup = fs.Mountpath{Path: mi.Path} ) // 1. fstat err := cos.Stat(mi.Path) @@ -65,13 +64,7 @@ func (f *FSHC) run(mi *fs.Mountpath, fqn string) { } // 2. resolve FS - err = dup.ResolveFS() - if err == nil { - if !dup.FS.Equal(mi.FS) { - err = fmt.Errorf("%s: detected filesystem change (%s => %s) at runtime", mi, mi.FS.String(), dup.FS.String()) - } - } - if err != nil { + if err = mi.CheckFS(); err != nil { nlog.Errorln(err) goto disable } diff --git a/fs/health/serialize.go b/fs/health/serialize.go index 3baa1dda550..5a17a16a4a8 100644 --- a/fs/health/serialize.go +++ b/fs/health/serialize.go @@ -26,7 +26,7 @@ type ror struct { var all sync.Map // [mpath => ror] func (*FSHC) IsErr(err error) bool { - return cmn.IsErrGetCap(err) || cos.IsIOError(err) + return cmn.IsErrGetCap(err) || cmn.IsErrMountpathChangeRT(err) || cos.IsIOError(err) } func (f *FSHC) OnErr(mi *fs.Mountpath, fqn string) { diff --git a/fs/walkbck.go b/fs/walkbck.go index e667afa4e96..39052b4c4d5 100644 --- a/fs/walkbck.go +++ b/fs/walkbck.go @@ -10,6 +10,7 @@ import ( "github.com/NVIDIA/aistore/cmn" "github.com/NVIDIA/aistore/cmn/debug" + "github.com/NVIDIA/aistore/cmn/nlog" "golang.org/x/sync/errgroup" ) @@ -43,6 +44,7 @@ type ( wbeHeap []wbeInfo ) +// lso and tests func WalkBck(opts *WalkBckOpts) error { debug.Assert(opts.Mi == nil && opts.Sorted) // TODO: support `opts.Sorted == false` var ( @@ -100,9 +102,14 @@ func WalkBck(opts *WalkBckOpts) error { /////////////// func (jg *joggerBck) walk() (err error) { - err = Walk(&jg.opts) + if err = jg.opts.Mi.CheckFS(); err != nil { + nlog.Errorln(err) + mfs.hc.FSHC(cmn.NewErrMountpathChangeRT(err), jg.opts.Mi, "") + } else { + err = Walk(&jg.opts) + } close(jg.workCh) - return + return err } func (jg *joggerBck) cb(fqn string, de DirEntry) error {