Skip to content

Commit

Permalink
logs: replace nlog with fmt for ishard stdout
Browse files Browse the repository at this point in the history
* `nlog` shouldn't be used for just printing info to stdout/stderr.
replaced with `fmt` instead.
* added error handling in archive goroutines

Signed-off-by: Tony Chen <[email protected]>
  • Loading branch information
Nahemah1022 committed Jul 25, 2024
1 parent 482e720 commit 0bbdc86
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 18 deletions.
17 changes: 8 additions & 9 deletions cmd/ishard/ishard/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/NVIDIA/aistore/api/apc"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/nlog"
"github.com/NVIDIA/aistore/ext/dsort"
)

Expand Down Expand Up @@ -179,13 +178,13 @@ func parseCliParams(cfg *Config) {
flag.Parse()

if cfg.MaxShardSize, err = cos.ParseSize(maxShardSizeStr, cos.UnitsIEC); err != nil {
nlog.Errorf("Invalid max_shard_size format: %s. Error: %v", maxShardSizeStr, err)
fmt.Fprintln(os.Stderr, err)
flag.Usage()
os.Exit(1)
}

if _, ok := MissingExtActMap[cfg.MissingExtAction]; !ok {
nlog.Errorf("Invalid action: %s. Accepted values are: abort, warn, ignore", cfg.MissingExtAction)
fmt.Fprintf(os.Stderr, "Invalid action: %s. Accepted values are: abort, warn, ignore\n", cfg.MissingExtAction)
flag.Usage()
os.Exit(1)
}
Expand All @@ -200,33 +199,33 @@ func parseCliParams(cfg *Config) {
}

if sampleKeyPatternStr == "" {
nlog.Infoln("`sample_key_pattern` is not specified, use `base_file_name` as sample key by default.")
fmt.Println("`sample_key_pattern` is not specified, use `base_file_name` as sample key by default.")
cfg.SampleKeyPattern = BaseFileNamePattern
} else if pattern, ok := commonPatterns[sampleKeyPatternStr]; ok {
cfg.SampleKeyPattern = pattern
} else {
nlog.Infof("`sample_key_pattern` %s is not built-in (`base_file_name` | `full_name` | `collapse_all_dir`), compiled as custom regex.", sampleKeyPatternStr)
fmt.Printf("`sample_key_pattern` %s is not built-in (`base_file_name` | `full_name` | `collapse_all_dir`), compiled as custom regex\n", sampleKeyPatternStr)
if _, err := regexp.Compile(sampleKeyPatternStr); err != nil {
nlog.Errorf("Invalid regex pattern: %s. Error: %v", cfg.SampleKeyPattern, err)
fmt.Fprintln(os.Stderr, err)
flag.Usage()
os.Exit(1)
}
cfg.SampleKeyPattern = SampleKeyPattern{Regex: sampleKeyPatternStr, CaptureGroup: "$1"}
}

if cfg.SrcBck.Name == "" || cfg.DstBck.Name == "" {
nlog.Errorln("Error: src_bck and dst_bck are required parameters.")
fmt.Fprintln(os.Stderr, "Error: src_bck and dst_bck are required parameters.")
flag.Usage()
os.Exit(1)
}

if cfg.SrcBck, cfg.SrcPrefix, err = cmn.ParseBckObjectURI(cfg.SrcBck.Name, cmn.ParseURIOpts{DefaultProvider: apc.AIS}); err != nil {
nlog.Errorf("Error on parsing source bucket: %s. Error: %v", cfg.SrcBck.Name, err)
fmt.Fprintln(os.Stderr, err)
flag.Usage()
os.Exit(1)
}
if cfg.DstBck, _, err = cmn.ParseBckObjectURI(cfg.DstBck.Name, cmn.ParseURIOpts{DefaultProvider: apc.AIS}); err != nil {
nlog.Errorf("Error on parsing destination bucket: %s. Error: %v", cfg.SrcBck.Name, err)
fmt.Fprintln(os.Stderr, err)
flag.Usage()
os.Exit(1)
}
Expand Down
5 changes: 2 additions & 3 deletions cmd/ishard/ishard/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/NVIDIA/aistore/cmd/ishard/ishard/config"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/nlog"
"github.com/NVIDIA/aistore/ext/dsort/shard"
"github.com/vbauerster/mpb/v4"
)
Expand Down Expand Up @@ -124,7 +123,7 @@ func (sf *ShardFactory) Create(recs *shard.Records, size int64, errCh chan error
sf.dryRunCLIMu.Lock()
w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', tabwriter.AlignRight)
if err := sf.CLItemplate.Execute(w, sh); err != nil {
fmt.Println("error executing template: %w", err)
fmt.Fprintln(os.Stderr, err)
}
w.Flush()
sf.dryRunCLIMu.Unlock()
Expand Down Expand Up @@ -167,7 +166,7 @@ func (sf *ShardFactory) poll() {
time.Sleep(backoff)
shardList, err := api.ListObjects(sf.baseParams, sf.toBck, &apc.LsoMsg{Prefix: sf.shardIter.Prefix, Flags: apc.LsNameSize}, api.ListArgs{})
if err != nil {
nlog.Errorln(err)
fmt.Fprintln(os.Stderr, err)
continue
}
for _, entry := range shardList.Entries {
Expand Down
17 changes: 14 additions & 3 deletions cmd/ishard/ishard/ishard.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/NVIDIA/aistore/cmd/ishard/ishard/factory"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/nlog"
"github.com/NVIDIA/aistore/ext/dsort/shard"
)

Expand Down Expand Up @@ -108,6 +107,12 @@ func (is *ISharder) archive(n *dirNode, path string) (parentRecords *shard.Recor
)

for name, child := range n.children {
select {
case err := <-errCh:
return nil, 0, err
default:
}

fullPath := path + "/" + name
if path == "" {
fullPath = name
Expand Down Expand Up @@ -137,6 +142,12 @@ func (is *ISharder) archive(n *dirNode, path string) (parentRecords *shard.Recor

n.records.Lock()
for _, record := range n.records.All() {
select {
case err := <-errCh:
return nil, 0, err
default:
}

totalSize += record.TotalSize()
recs.Insert(record)

Expand Down Expand Up @@ -266,12 +277,12 @@ func (is *ISharder) Start() error {
objTotalSize += en.Size
}

fmt.Printf("Listed Object Total Size: %s\n", cos.ToSizeIEC(objTotalSize, 2))

if objListPage.ContinuationToken == "" {
break
}
msg.ContinuationToken = objListPage.ContinuationToken

nlog.Infof("Listed Object Total Size: %s\n", cos.ToSizeIEC(objTotalSize, 2))
}

if is.shardFactory, err = factory.NewShardFactory(is.baseParams, is.cfg.SrcBck, is.cfg.DstBck, is.cfg.Ext, is.cfg.ShardTemplate, is.cfg.DryRunFlag); err != nil {
Expand Down
5 changes: 2 additions & 3 deletions cmd/ishard/ishard/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/NVIDIA/aistore/api"
"github.com/NVIDIA/aistore/api/apc"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/nlog"
"github.com/NVIDIA/aistore/ext/dsort"
)

Expand All @@ -35,13 +34,13 @@ func (is *ISharder) sort(shardNames []string) (string, error) {
Algorithm: is.cfg.Algorithm,
}

nlog.Infoln("dsort started...")
fmt.Println("dsort started...")
return api.StartDsort(is.baseParams, &spec)
}

// Helper function to wait for the dsort job with a given UUID and timeout
func (is *ISharder) waitSort(dsortManagerUUID string, timeout *time.Duration) error {
nlog.Infof("waiting for dsort[%s]\n", dsortManagerUUID)
fmt.Printf("waiting for dsort[%s]\n", dsortManagerUUID)

waitTimeout := DsortDefaultTimeout
if timeout != nil {
Expand Down

0 comments on commit 0bbdc86

Please sign in to comment.