Skip to content

Commit

Permalink
assorted fixes: fs-linux; CLI iterations; FSHC; OOM periodic
Browse files Browse the repository at this point in the history
* revise (mountpath => filesystem) resolution
  - mountpath vs FS mountpoint; relative path
  - refactor & cleanup; fix and add comments
* CLI: when iterating, perform aistore version check only the first time
  - extend `longRun` singleton - add iteration count
* OOM periodic: flip CAS statement (typo)
* FSHC: flip CAS statement (ditto)
  - refactor & cleanup
* FSHC config: reduce default soft-error limit to 10 (was 100)

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Aug 6, 2024
1 parent 0698ac2 commit 4471151
Show file tree
Hide file tree
Showing 12 changed files with 111 additions and 98 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ help:
"make deploy" "Deploy cluster locally" \
"make kill clean" "Stop locally deployed cluster and cleanup all cluster-related data and bucket metadata (but not cluster map)" \
"make kill deploy <<< $$'7\n2\n4\ny\ny\n'" "Shutdown and then (non-interactively) generate local configs and deploy a cluster consisting of 7 targets (4 mountpaths each) and 2 proxies; build 'aisnode' executable with GCP and AWS backends" \
"TAGS=\"aws gcp\" make kill deploy <<< $$'7\n2\n'" "Same as above" \
"TAGS=\"aws gcp\" make kill deploy <<< $$'7\n2\n'" "Same as above (see docs/getting_started.md for many more examples)" \
"GORACE='log_path=/tmp/race' make deploy" "Deploy cluster with race detector, write reports to /tmp/race.<PID>" \
"MODE=debug make deploy" "Deploy cluster with 'aisnode' (AIS target and proxy) executable built with debug symbols and debug asserts enabled" \
"BUCKET=tmp make test-short" "Run all short tests" \
Expand Down
8 changes: 5 additions & 3 deletions ais/fspathrgrp.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,11 @@ func (g *fsprungroup) doDD(action string, flags uint64, mpath string, dontResilv

core.UncacheMountpath(rmi)

if noResil || dontResilver || !cmn.GCO.Get().Resilver.Enabled {
nlog.Infof("%s: %q %s: no resilvering (%t, %t, %t)", g.t, action, rmi,
noResil, !dontResilver, cmn.GCO.Get().Resilver.Enabled)
config := cmn.GCO.Get()
if noResil || dontResilver || !config.Resilver.Enabled {
nlog.Infoln(g.t.String(), "action", action, rmi.String(), "- not resilvering:")
nlog.Infoln("noResil (action done?):", noResil, "dontResilver:", dontResilver,
"config enabled:", config.Resilver.Enabled)
g.postDD(rmi, action, nil /*xaction*/, nil /*error*/) // ditto (compare with the one below)
return rmi, nil
}
Expand Down
3 changes: 2 additions & 1 deletion ais/tgtspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import (

const (
// - note that an API call (e.g. CLI) will go through anyway
// - compare with cmn/cos/oom.go
// - compare with cmn/cos/oom
// - compare with fs/health/fshc
minAutoDetectInterval = 10 * time.Minute
)

Expand Down
5 changes: 4 additions & 1 deletion ais/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,16 @@ func getLocalIPv4s(config *cmn.Config) (addrlist []*localIPv4Info, err error) {
curr := &localIPv4Info{}
if ipnet, ok := addr.(*net.IPNet); ok {
if ipnet.IP.IsLoopback() {
// K8s: always skip (ie, exclude) 127.0.0.1 loopback
// K8s: always exclude 127.0.0.1 loopback
if k8s.IsK8s() {
continue
}
// non K8s and fspaths:
if !config.TestingEnv() {
if excludeLoopbackIP() {
if ipnet.IP.To4() != nil {
nlog.Warningln("(non-K8s, fspaths) deployment: excluding loopback IP:", ipnet.IP)
}
continue
}
}
Expand Down
7 changes: 5 additions & 2 deletions cmd/cli/cli/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type (
longRun struct {
count int
lfooter int
iters int
refreshRate time.Duration
offset int64
mapBegin, mapEnd teb.StstMap
Expand Down Expand Up @@ -190,6 +191,7 @@ func Run(version, buildtime string, args []string) error {
}
return nil
}
a.longRun.iters = 1
if a.longRun.outFile != nil {
defer a.longRun.outFile.Close()
}
Expand Down Expand Up @@ -225,6 +227,7 @@ func (a *acli) runForever(args []string) error {
if err := a.runOnce(args); err != nil {
return err
}
a.longRun.iters++
a.longRun.mapBegin = a.longRun.mapEnd
a.longRun.mapEnd = nil
}
Expand All @@ -239,12 +242,12 @@ func printLongRunFooter(w io.Writer, repeat int) {
func (a *acli) runN(args []string) error {
delim := fcyan(strings.Repeat("-", 16))
fmt.Fprintln(a.outWriter, delim)
for i := 2; i <= a.longRun.count; i++ {
for ; a.longRun.iters < a.longRun.count; a.longRun.iters++ {
time.Sleep(a.longRun.refreshRate)
if err := a.runOnce(args); err != nil {
return err
}
if i < a.longRun.count {
if a.longRun.iters < a.longRun.count-1 {
fmt.Fprintln(a.outWriter, delim)
}
}
Expand Down
6 changes: 6 additions & 0 deletions cmd/cli/cli/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ func isRebalancing(tstatusMap teb.StstMap) bool {
func checkVersionWarn(c *cli.Context, role string, mmc []string, stmap teb.StstMap) bool {
const fmtEmptyVer = "empty version from %s (in maintenance mode?)"

longParams := getLongRunParams(c)
if longParams != nil && longParams.iters > 0 {
return false // already warned once, nothing to do
}

expected := mmc[0] + versionSepa + mmc[1]
minc, err := strconv.Atoi(mmc[1])
if err != nil {
Expand Down Expand Up @@ -192,6 +197,7 @@ func verWarn(c *cli.Context, snode *meta.Snode, role, version, expected string,
warn = fmt.Sprintf("node %s%s run%s aistore software version %s, which may not be fully compatible with the CLI (expecting v%s)",
sname, s1, s2, version, expected)
}

actionWarn(c, warn+"\n")
}

Expand Down
9 changes: 5 additions & 4 deletions cmn/cos/oom.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ func FreeMemToOS(force bool) bool {
}

now := mono.NanoTime()
if elapsed := time.Duration(now - prev); elapsed < ival {
nlog.Infoln("not running - only", elapsed.String(), "passed since the previous run")
elapsed := time.Duration(now - prev)
if elapsed < ival {
nlog.Infoln("not running - only", elapsed, "passed since the previous run")
return false
}
if ratomic.CompareAndSwapInt64(&runningOOM, 0, now) {
nlog.Infoln("already running, nothing to do")
if !ratomic.CompareAndSwapInt64(&runningOOM, 0, now) {
nlog.Infoln("(still) running for", elapsed, "- nothing to do")
return false
}

Expand Down
10 changes: 3 additions & 7 deletions deploy/dev/local/aisnode_config.fspaths.sh
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ cat > $AIS_CONF_FILE <<EOL
"fshc": {
"test_files": 4,
"error_limit": 2,
"soft_err_limit": 100,
"soft_err_limit": 10,
"soft_err_time": "10s",
"enabled": true
},
Expand Down Expand Up @@ -172,11 +172,7 @@ cat > $AIS_CONF_FILE <<EOL
EOL

## E.g. usage
## 1) with loopbacks:
## deploy/dev/loopback.sh --mountpath /tmp/ais/mp8 --size 1G
## deploy/dev/loopback.sh --mountpath /tmp/ais/mp9 --size 1G
## 2) and without:
## mkdir -p /tmp/ais/mp8 /tmp/ais/mp9
## for i in {1..4}; do deploy/dev/loopback.sh --mountpath /tmp/ais/mp$i --size 1G; done

cat > $AIS_LOCAL_CONF_FILE <<EOL
{
Expand All @@ -190,7 +186,7 @@ cat > $AIS_LOCAL_CONF_FILE <<EOL
"port_intra_control": "${PORT_INTRA_CONTROL:-9080}",
"port_intra_data": "${PORT_INTRA_DATA:-10080}"
},
"fspaths": {"/tmp/ais/mp1": "disk1", "/tmp/ais/mp2": "disk2", "/tmp/ais/mp3": "disk3"},
"fspaths": {"/tmp/ais/mp1": "disk1", "/tmp/ais/mp2": "disk2", "/tmp/ais/mp3": "disk3", "/tmp/ais/mp4": "disk4"},
"test_fspaths": {
"root": "${TEST_FSPATH_ROOT:-/tmp/ais$NEXT_TIER/}",
"count": 0,
Expand Down
2 changes: 1 addition & 1 deletion deploy/dev/local/aisnode_config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ cat > $AIS_CONF_FILE <<EOL
"fshc": {
"test_files": 4,
"error_limit": 2,
"soft_err_limit": 100,
"soft_err_limit": 10,
"soft_err_time": "10s",
"enabled": true
},
Expand Down
50 changes: 31 additions & 19 deletions fs/fs_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,21 @@ import (
"syscall"
)

const procmounts = "/proc/mounts"

func (mi *Mountpath) resolveFS() error {
var fsStats syscall.Statfs_t
if err := syscall.Statfs(mi.Path, &fsStats); err != nil {
return fmt.Errorf("cannot statfs fspath %q, err: %w", mi.Path, err)
return fmt.Errorf("cannot statfs mountpath %q, err: %w", mi.Path, err)
}

fh, e := os.Open(procmounts)
if e != nil {
return fmt.Errorf("FATAL: failed to open Linux %q, err: %w", procmounts, e)
}
fs, fsType, err := fqn2FsInfo(mi.Path)

fs, fsType, err := _resolve(mi.Path, fh)
fh.Close()
if err != nil {
return err
}
Expand All @@ -29,19 +38,15 @@ func (mi *Mountpath) resolveFS() error {
return nil
}

// fqn2FsInfo is used only at startup to store file systems for each mountpath.
func fqn2FsInfo(path string) (fs, fsType string, err error) {
file, err := os.Open("/proc/mounts")
if err != nil {
return "", "", err
}
defer file.Close()
// NOTE: filepath.Rel() returns '.' not an empty string when there is an exact match.
// TODO: consider excepting only exact match (rel == '.') as a match

func _resolve(path string, fh *os.File) (fs, fsType string, _ error) {
var (
bestMatch string
scanner = bufio.NewScanner(file)
scanner = bufio.NewScanner(fh)
)

outer:
for scanner.Scan() {
fields := strings.Fields(scanner.Text())
if len(fields) < 3 {
Expand All @@ -52,23 +57,30 @@ func fqn2FsInfo(path string) (fs, fsType string, err error) {
if err != nil {
continue
}
if bestMatch == "" || len(rel) < len(bestMatch) {
if rel == "." {
// when ais mountpath EQ mount point
return fields[0], fields[2], nil
}
for i := range len(rel) {
if rel[i] != '.' && rel[i] != filepath.Separator {
// mountpath can be a direct descendant (of a mount point),
// with `rel` containing one or more "../" snippets
continue outer
}
}
if bestMatch == "" || len(rel) < len(bestMatch) /*minimizing `rel`*/ {
bestMatch = rel
fs = fields[0]
fsType = fields[2]
}
}

if err := scanner.Err(); err != nil {
return "", "", err
return "", "", fmt.Errorf("FATAL: failed reading Linux %q, err: %w", procmounts, err)
}

// NOTE: `filepath.Rel` returns `.` (not an empty string) when there is an exact match.
if bestMatch == "" {
return "", "", fmt.Errorf("mount point not found for path: %q", path)
return "", "", fmt.Errorf("failed to resolve mountpath %q: mount point not found", path)
}

return
return fs, fsType, nil
}

// DirectOpen opens a file with direct disk access (with OS caching disabled).
Expand Down
48 changes: 48 additions & 0 deletions fs/health/fshc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ import (
"math/rand/v2"
"os"
"path/filepath"
"sync"
ratomic "sync/atomic"
"time"

"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/mono"
"github.com/NVIDIA/aistore/cmn/nlog"
"github.com/NVIDIA/aistore/fs"
)
Expand All @@ -31,6 +34,8 @@ const (
maxNumFiles = 100 // read: upto so many existing files
)

// - compare with cmn/cos/oom
// - compare with ais/tgtspace
const (
minTimeBetweenRuns = 4 * time.Minute
)
Expand All @@ -48,8 +53,51 @@ type (
}
)

type (
ror struct {
last int64
running int64
}
)

// per mountpath: recent-or-running
var all sync.Map // [mpath => ror]

func NewFSHC(t disabler) (f *FSHC) { return &FSHC{t: t} }

func (*FSHC) IsErr(err error) bool {
return cmn.IsErrGetCap(err) || cmn.IsErrMpathCheck(err) || cos.IsIOError(err)
}

// serialize per-mountpath runs
func (f *FSHC) OnErr(mi *fs.Mountpath, fqn string) {
var (
now = mono.NanoTime()
r = &ror{last: now, running: now}
a, loaded = all.LoadOrStore(mi.Path, r)
)
if loaded {
r = a.(*ror)
prev := ratomic.LoadInt64(&r.last)
elapsed := time.Duration(now - prev)
if elapsed < minTimeBetweenRuns {
nlog.Infoln("not enough time passed since the previous run:", elapsed)
return
}
if !ratomic.CompareAndSwapInt64(&r.running, 0, now) {
nlog.Infoln(mi.String(), "still running:", elapsed, "- nothing to do")
return
}
}
go run(f, mi, r, fqn, now)
}

func run(f *FSHC, mi *fs.Mountpath, r *ror, fqn string, now int64) {
f.run(mi, fqn)
ratomic.StoreInt64(&r.last, now)
ratomic.StoreInt64(&r.running, 0)
}

func (f *FSHC) run(mi *fs.Mountpath, fqn string) {
var (
serr, pass string
Expand Down
59 changes: 0 additions & 59 deletions fs/health/serialize.go

This file was deleted.

0 comments on commit 4471151

Please sign in to comment.