Skip to content

Commit

Permalink
follow-up: rebalance; archive
Browse files Browse the repository at this point in the history
* reb: log header by strings builder
* archive: refactor 5c94da9

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Aug 24, 2024
1 parent 0a54545 commit 727e45d
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 28 deletions.
4 changes: 2 additions & 2 deletions cmn/archive/err.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Package archive: write, read, copy, append, list primitives
// across all supported formats
/*
* Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.
*/
package archive

Expand All @@ -14,7 +14,7 @@ import (

const TarBlockSize = 512 // Size of each block in a tar stream

const FmtErrShortFile = "%s file is too short, should have at least %d size"
const fmtErrTooShort = "%s file is too short, should have at least %d size"

// assorted errors
type (
Expand Down
33 changes: 17 additions & 16 deletions cmn/archive/mime.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,30 +166,31 @@ func MimeFQN(smm *memsys.MMSA, mime, archname string) (m string, err error) {
return
}

func _detect(file cos.LomReader, archname, mine string, buf []byte) (m string, n int, err error) {
n, err = file.Read(buf)
func _detect(file cos.LomReader, archname, mime string, buf []byte) (string, int, error) {
n, err := file.Read(buf)
if err != nil {
return
}
if mine == ExtTar && n < sizeDetectMime {
err = NewErrUnknownFileExt(archname, fmt.Sprintf(FmtErrShortFile, ExtTar, sizeDetectMime))
return
}
if mine == ExtTarGz && n < magicGzip.offset+len(magicGzip.sig) {
err = NewErrUnknownFileExt(archname, fmt.Sprintf(FmtErrShortFile, ExtTarGz, magicGzip.offset+len(magicGzip.sig)))
return
return "", 0, err
}
if mine == ExtTarLz4 && n < magicLz4.offset+len(magicLz4.sig) {
err = NewErrUnknownFileExt(archname, fmt.Sprintf(FmtErrShortFile, ExtTarGz, magicLz4.offset+len(magicLz4.sig)))
return
switch mime {
case ExtTar:
if n < sizeDetectMime {
return "", n, NewErrUnknownFileExt(archname, fmt.Sprintf(fmtErrTooShort, ExtTar, sizeDetectMime))
}
case ExtTarGz:
if l := magicGzip.offset + len(magicGzip.sig) + 4; n < l {
return "", n, NewErrUnknownFileExt(archname, fmt.Sprintf(fmtErrTooShort, ExtTarGz, l))
}
case ExtTarLz4:
if l := magicLz4.offset + len(magicLz4.sig) + 4; n < l {
return "", n, NewErrUnknownFileExt(archname, fmt.Sprintf(fmtErrTooShort, ExtTarGz, l))
}
}
for _, magic := range allMagics {
if n > magic.offset && bytes.HasPrefix(buf[magic.offset:], magic.sig) {
return magic.mime, n, nil
}
}
err = fmt.Errorf("failed to detect supported file signatures in %q", archname)
return
return "", n, fmt.Errorf("failed to detect supported file signatures in %q", archname)
}

func EqExt(ext1, ext2 string) bool {
Expand Down
2 changes: 1 addition & 1 deletion reb/bcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (reb *Reb) rxReady(tsi *meta.Snode, rargs *rebArgs) (ok bool) {
curwt += sleep
}
logHdr, tname := reb.logHdr(rargs.id, rargs.smap), tsi.StringEx()
nlog.Errorf("%s: timed out waiting for %s to reach %s state", logHdr, tname, stages[rebStageTraverse])
nlog.Errorln(logHdr, "timed out waiting for", tname, "to reach", stages[rebStageTraverse], "stage")
return
}

Expand Down
6 changes: 3 additions & 3 deletions reb/recv.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (reb *Reb) recvStageNtfn(hdr *transport.ObjHdr, _ io.Reader, errRx error) e
)
if xreb == nil {
if reb.stages.stage.Load() != rebStageInactive {
nlog.Errorf("%s: nil rebalancing xaction", reb.logHdr(rebID, rsmap))
nlog.Errorln(reb.logHdr(rebID, rsmap), "nil rebalancing xaction")
}
return nil
}
Expand All @@ -120,8 +120,8 @@ func (reb *Reb) recvStageNtfn(hdr *transport.ObjHdr, _ io.Reader, errRx error) e
}
// other's old
if rebID > ntfn.rebID {
nlog.Warningf("%s: stage notification from %s(%s): %s", reb.logHdr(rebID, rsmap),
meta.Tname(ntfn.daemonID), otherStage, reb.warnID(ntfn.rebID, ntfn.daemonID))
nlog.Warningln(reb.logHdr(rebID, rsmap), "stage notification from",
meta.Tname(ntfn.daemonID), "at stage", otherStage+":", reb.warnID(ntfn.rebID, ntfn.daemonID))
return nil
}

Expand Down
22 changes: 16 additions & 6 deletions reb/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package reb
import (
"fmt"
"strconv"
"strings"
"time"

"github.com/NVIDIA/aistore/cmn"
Expand Down Expand Up @@ -38,16 +39,25 @@ func (reb *Reb) xctn() *xs.Rebalance { return reb.xreb.Load() }
func (reb *Reb) setXact(xctn *xs.Rebalance) { reb.xreb.Store(xctn) }

func (reb *Reb) logHdr(rebID int64, smap *meta.Smap, initializing ...bool) string {
smapv := "v<???>"
var sb strings.Builder

sb.WriteString(core.T.String())
sb.WriteString("[g")
sb.WriteString(strconv.FormatInt(rebID, 10))
sb.WriteByte(',')
if smap != nil {
smapv = "v" + strconv.FormatInt(smap.Version, 10)
sb.WriteString(strconv.FormatInt(smap.Version, 10))
} else {
sb.WriteString("v<???>")
}
s := fmt.Sprintf("%s[g%d,%s", core.T, rebID, smapv)
if len(initializing) > 0 {
return s + "]"
sb.WriteByte(']')
return sb.String() // "%s[g%d,%s]"
}
stage := stages[reb.stages.stage.Load()]
return fmt.Sprintf("%s,%s]", s, stage)
sb.WriteByte(',')
sb.WriteString(stages[reb.stages.stage.Load()])
sb.WriteByte(']')
return sb.String() // "%s[g%d,%s,%s]"
}

func (reb *Reb) warnID(remoteID int64, tid string) (s string) {
Expand Down
3 changes: 3 additions & 0 deletions xact/xs/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ func (r *XactArch) Do(msg *cmn.ArchiveBckMsg) {
r.cleanup()
}

if r.IsAborted() {
return
}
wi.j.workCh <- &archtask{wi, lrit}
if r.Err() != nil {
wi.cleanup()
Expand Down

0 comments on commit 727e45d

Please sign in to comment.