Skip to content

Commit

Permalink
disk metrics; CLI: verbose counters, empty version
Browse files Browse the repository at this point in the history
* do not build disk metric names at runtime
* CLI: skip internal (lcache, stream) counters unless verbose
* CLI: version check vs. nodes in maintenance

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jul 24, 2024
1 parent 0259869 commit bd927bc
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 38 deletions.
2 changes: 1 addition & 1 deletion ais/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ func (t *target) getObject(w http.ResponseWriter, r *http.Request, dpq *dpq, bck

// do
if ecode, err := goi.getObject(); err != nil {
if !goi.softIOErr {
if !goi.isIOErr {
debug.Assert(err != errSendingResp && !cos.IsRetriableConnErr(err), err)
t.statsT.IncNonIOErr()
}
Expand Down
14 changes: 7 additions & 7 deletions ais/tgtobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type (
retry bool // once
cold bool // true if executed backend.Get
latestVer bool // QparamLatestVer || 'versioning.*_warm_get'
softIOErr bool // to count GET error as a "soft IO error"
isIOErr bool // to count GET error as a "IO error"; see `Trunner._softErrs()`
}

// textbook append: (packed) handle and control structure (see also `putA2I` arch below)
Expand Down Expand Up @@ -558,7 +558,7 @@ do:
if err != nil {
cold = cos.IsNotExist(err, 0)
if !cold {
goi.softIOErr = true
goi.isIOErr = true
return http.StatusInternalServerError, err
}
if goi.lom.IsFeatureSet(feat.DisableColdGET) && goi.lom.Bck().IsRemote() {
Expand All @@ -583,7 +583,7 @@ do:
er2 := lom2.InitBck(goi.lom.Bucket())
if er2 == nil {
er2 = lom2.Load(true /*cache it*/, false /*locked*/)
goi.softIOErr = true
goi.isIOErr = true
}
if er2 == nil {
core.FreeLOM(goi.lom)
Expand All @@ -599,7 +599,7 @@ do:
}
goi.lom.Lock(false)
if err = goi.lom.Load(true /*cache it*/, true /*locked*/); err != nil {
goi.softIOErr = true
goi.isIOErr = true
return 0, err
}
goto fin // ok, done
Expand Down Expand Up @@ -1061,7 +1061,7 @@ func (goi *getOI) _txrng(fqn string, lmfh *os.File, whdr http.Header, hrng *htra
_, cksumH, err := cos.CopyAndChecksum(sgl /*as ReaderFrom*/, r, nil, ckconf.Type)
if err != nil {
sgl.Free()
goi.softIOErr = true
goi.isIOErr = true
return err
}
r = sgl
Expand Down Expand Up @@ -1127,7 +1127,7 @@ func (goi *getOI) _txarch(fqn string, lmfh *os.File, whdr http.Header) error {
var csl cos.ReadCloseSizer
csl, err = ar.ReadOne(dpq.arch.path)
if err != nil {
goi.softIOErr = true
goi.isIOErr = true
return cmn.NewErrFailedTo(goi.t, "extract "+dpq._archstr()+" from", lom.Cname(), err)
}
if csl == nil {
Expand All @@ -1148,7 +1148,7 @@ func (goi *getOI) _txarch(fqn string, lmfh *os.File, whdr http.Header) error {
whdr.Set(cos.HdrContentType, cos.ContentTar)
err = ar.ReadUntil(rcb, dpq.arch.regx, dpq.arch.mmode)
if err != nil {
goi.softIOErr = true
goi.isIOErr = true
err = cmn.NewErrFailedTo(goi.t, "extract files that match "+dpq._archstr()+" from", lom.Cname(), err)
}
if err == nil && rcb.num == 0 {
Expand Down
19 changes: 19 additions & 0 deletions cmd/cli/cli/performance.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ type (
// true when called by top-level handler
var allPerfTabs bool

var verboseCounters = [...]string{
stats.LcacheCollisionCount,
stats.LcacheEvictedCount,
stats.LcacheFlushColdCount,
cos.StreamsOutObjCount,
cos.StreamsOutObjSize,
cos.StreamsInObjCount,
cos.StreamsInObjSize,
}

var (
showPerfFlags = append(
longRunFlags,
Expand All @@ -37,6 +47,7 @@ var (
unitsFlag,
averageSizeFlag,
nonverboseFlag,
verboseFlag,
)

// `show performance` command
Expand Down Expand Up @@ -142,6 +153,14 @@ func showCountersHandler(c *cli.Context) error {

for name, kind := range metrics {
if metrics[name] == stats.KindCounter || metrics[name] == stats.KindSize {
//
// skip assorted internal counters and sizes, unless verbose
//
if !flagIsSet(c, verboseFlag) {
if cos.StringInSlice(name, verboseCounters[:]) {
continue
}
}
selected[name] = kind
}
}
Expand Down
15 changes: 14 additions & 1 deletion cmd/cli/cli/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ func isRebalancing(tstatusMap teb.StstMap) bool {
}

func checkVersionWarn(c *cli.Context, role string, mmc []string, stmap teb.StstMap) bool {
const fmtEmptyVer = "empty version from %s (in maintenance mode?)"

expected := mmc[0] + versionSepa + mmc[1]
minc, err := strconv.Atoi(mmc[1])
if err != nil {
Expand All @@ -106,7 +108,10 @@ func checkVersionWarn(c *cli.Context, role string, mmc []string, stmap teb.StstM
}
for _, ds := range stmap {
if ds.Version == "" {
warn := fmt.Sprintf("empty version from %s (in maintenance mode?)", ds.Node.Snode.StringEx())
if ds.Node.Snode.InMaintOrDecomm() {
continue
}
warn := fmt.Sprintf(fmtEmptyVer, ds.Node.Snode.StringEx())
actionWarn(c, warn)
continue
}
Expand Down Expand Up @@ -140,6 +145,14 @@ func checkVersionWarn(c *cli.Context, role string, mmc []string, stmap teb.StstM
// ditto
var cnt int
for _, ds2 := range stmap {
if ds2.Node.Snode.InMaintOrDecomm() {
continue
}
if ds2.Version == "" {
warn := fmt.Sprintf(fmtEmptyVer, ds2.Node.Snode.StringEx())
actionWarn(c, warn)
continue
}
if ds.Node.Snode.ID() != ds2.Node.Snode.ID() {
mmx2 := strings.Split(ds2.Version, versionSepa)
minx2, _ := strconv.Atoi(mmx2[1])
Expand Down
90 changes: 61 additions & 29 deletions stats/target_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,18 @@ const (
)

type (
dmetric map[string]string // "read.bps" => full metric name, etc.

Trunner struct {
runner // the base (compare w/ Prunner)
t core.Target
Tcdf fs.Tcdf `json:"cdf"`
disk ios.AllDiskStats
xln string
cs struct {
disk struct {
stats ios.AllDiskStats // numbers
metrics map[string]dmetric // respective names
}
xln string
cs struct {
last int64 // mono.Nano
}
softErrs int64 // numSoftErrs(); to monitor the change
Expand Down Expand Up @@ -139,7 +144,9 @@ func (r *Trunner) Init() *atomic.Bool {

r.ctracker = make(copyTracker, numTargetStats) // these two are allocated once and only used in serial context
r.lines = make([]string, 0, 16)
r.disk = make(ios.AllDiskStats, 16)

r.disk.stats = make(ios.AllDiskStats, 16)
r.disk.metrics = make(map[string]dmetric, 16)

config := cmn.GCO.Get()
r.core.statsTime = config.Periodic.StatsTime.D()
Expand Down Expand Up @@ -171,22 +178,44 @@ func (r *Trunner) InitCDF(config *cmn.Config) error {
return nil
}

// TODO: use map
func _dmetric(disk, metric string) string {
func (r *Trunner) _dmetric(disk, metric string) string {
var sb strings.Builder
sb.WriteString(diskMetricLabel)
sb.WriteByte('.')
sb.WriteString(disk)
sb.WriteByte('.')
sb.WriteString(metric)
return sb.String()
fullname := sb.String()

m, ok := r.disk.metrics[disk]
if !ok {
debug.Assert(metric == "read.bps", metric)
m = make(map[string]string, 5)
r.disk.metrics[disk] = m

// init all the rest, as per ios.DiskStats
r._dmetric(disk, "avg.rsize")
r._dmetric(disk, "write.bps")
r._dmetric(disk, "avg.wsize")
r._dmetric(disk, "util")
}
m[metric] = fullname
return fullname
}

func nameRbps(disk string) string { return _dmetric(disk, "read.bps") }
func nameRavg(disk string) string { return _dmetric(disk, "avg.rsize") }
func nameWbps(disk string) string { return _dmetric(disk, "write.bps") }
func nameWavg(disk string) string { return _dmetric(disk, "avg.wsize") }
func nameUtil(disk string) string { return _dmetric(disk, "util") }
// NOTE: must always be called first and prior to all the other disk-naming metrics (below)
func (r *Trunner) nameRbps(disk string) string {
if dmetric, ok := r.disk.metrics[disk]; ok {
return dmetric["read.bps"]
}
// init & slow path
return r._dmetric(disk, "read.bps")
}

func (r *Trunner) nameRavg(disk string) string { return r.disk.metrics[disk]["avg.rsize"] }
func (r *Trunner) nameWbps(disk string) string { return r.disk.metrics[disk]["write.bps"] }
func (r *Trunner) nameWavg(disk string) string { return r.disk.metrics[disk]["avg.wsize"] }
func (r *Trunner) nameUtil(disk string) string { return r.disk.metrics[disk]["util"] }

// log vs idle logic
func isDiskMetric(name string) bool {
Expand Down Expand Up @@ -253,16 +282,18 @@ func (r *Trunner) RegMetrics(snode *meta.Snode) {
}

func (r *Trunner) RegDiskMetrics(snode *meta.Snode, disk string) {
s, n := r.core.Tracker, nameRbps(disk)
if _, ok := s[n]; ok { // must be config.TestingEnv()
s := r.core.Tracker
rbps := r.nameRbps(disk)
if _, ok := s[rbps]; ok { // must be config.TestingEnv()
return
}
r.reg(snode, n, KindComputedThroughput)
r.reg(snode, nameWbps(disk), KindComputedThroughput)
r.reg(snode, rbps, KindComputedThroughput)
r.reg(snode, r.nameRavg(disk), KindGauge)

r.reg(snode, r.nameWbps(disk), KindComputedThroughput)
r.reg(snode, r.nameWavg(disk), KindGauge)

r.reg(snode, nameRavg(disk), KindGauge)
r.reg(snode, nameWavg(disk), KindGauge)
r.reg(snode, nameUtil(disk), KindGauge)
r.reg(snode, r.nameUtil(disk), KindGauge)
}

func (r *Trunner) GetStats() (ds *Node) {
Expand Down Expand Up @@ -331,23 +362,24 @@ func (r *Trunner) log(now int64, uptime time.Duration, config *cmn.Config) {

// 1. disk stats
refreshCap := r.Tcdf.HasAlerts()
fs.DiskStats(r.disk, nil /*fs.TcdfExt*/, config, refreshCap)
fs.DiskStats(r.disk.stats, nil /*fs.TcdfExt*/, config, refreshCap)

s := r.core
for disk, stats := range r.disk {
v := s.Tracker[nameRbps(disk)]
for disk, stats := range r.disk.stats {
n := r.nameRbps(disk)
v := s.Tracker[n]
if v == nil {
nlog.Warningln("missing:", nameRbps(disk))
nlog.Warningln("missing:", n)
continue
}
v.Value = stats.RBps
v = s.Tracker[nameRavg(disk)]
v = s.Tracker[r.nameRavg(disk)]
v.Value = stats.Ravg
v = s.Tracker[nameWbps(disk)]
v = s.Tracker[r.nameWbps(disk)]
v.Value = stats.WBps
v = s.Tracker[nameWavg(disk)]
v = s.Tracker[r.nameWavg(disk)]
v.Value = stats.Wavg
v = s.Tracker[nameUtil(disk)]
v = s.Tracker[r.nameUtil(disk)]
v.Value = stats.Util
}

Expand All @@ -374,7 +406,7 @@ func (r *Trunner) log(now int64, uptime time.Duration, config *cmn.Config) {

if !refreshCap && set != 0 {
// refill r.disk (ios.AllDiskStats) prior to logging
fs.DiskStats(r.disk, nil /*fs.TcdfExt*/, config, true /*refresh cap*/)
fs.DiskStats(r.disk.stats, nil /*fs.TcdfExt*/, config, true /*refresh cap*/)
}

// 4. append disk stats to log subject to (idle) filtering (see related: `ignoreIdle`)
Expand Down Expand Up @@ -508,7 +540,7 @@ func (r *Trunner) _cap(config *cmn.Config, now int64) (set, clr cos.NodeStateFla
// [ disk: read throughput, average read size, write throughput, average write size, disk utilization ]
// e.g.: [ sda: 94MiB/s, 68KiB, 25MiB/s, 21KiB, 82% ]
func (r *Trunner) logDiskStats(now int64) {
for disk, stats := range r.disk {
for disk, stats := range r.disk.stats {
if stats.Util < minLogDiskUtil/2 || (stats.Util < minLogDiskUtil && now < r.next) {
continue
}
Expand Down

0 comments on commit bd927bc

Please sign in to comment.