From 99b7a961a327d08593541bc87a5c1d79a23bcfc3 Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Wed, 25 Sep 2024 14:37:55 -0400 Subject: [PATCH] CLI: 'ais put --retry ' with increasing timeout, if need be Signed-off-by: Alex Aizman --- cmd/cli/cli/const.go | 5 +++ cmd/cli/cli/err.go | 28 ++++++++++++++++ cmd/cli/cli/object_hdlr.go | 4 ++- cmd/cli/cli/verbfobj.go | 65 +++++++++++++++++++++++++++++++++++--- 4 files changed, 96 insertions(+), 6 deletions(-) diff --git a/cmd/cli/cli/const.go b/cmd/cli/cli/const.go index faab8ac7577..58e74a575f1 100644 --- a/cmd/cli/cli/const.go +++ b/cmd/cli/cli/const.go @@ -863,6 +863,11 @@ var ( Usage: "[end-to-end protection] compute client-side checksum configured for the destination bucket\n" + putObjCksumText, } + putRetriesFlag = cli.IntFlag{ + Name: "retries", + Value: 1, + Usage: "retry PUT operation so many times (with increasing timeout if timed out)", + } appendConcatFlag = cli.BoolFlag{ Name: "append", diff --git a/cmd/cli/cli/err.go b/cmd/cli/cli/err.go index 49a86b8024e..cb27c785771 100644 --- a/cmd/cli/cli/err.go +++ b/cmd/cli/cli/err.go @@ -349,6 +349,7 @@ func V(err error) error { return err } +// with hints and tips (compare with `stripErr` below) func formatErr(err error) error { if err == nil { return nil @@ -382,6 +383,33 @@ func formatErr(err error) error { } } +// remove "verb URL" from the error (compare with `formatErr`) +// TODO: add more apc.URLPath* paths +func stripErr(err error) error { + var ( + s = err.Error() + l int + ) + i := strings.Index(s, apc.URLPathObjects.S) + if i < 0 { + i = strings.Index(s, apc.URLPathBuckets.S) + } + if i < 0 { + return err + } + + k := strings.Index(s[i+l:], " ") + if k < 0 || len(s) < i+l+k+5 { + return err + } + return errors.New(s[i+l+k+1:]) +} + +func isTimeout(err error) bool { + s := strings.ToLower(err.Error()) + return strings.Contains(s, "timeout") || strings.Contains(s, "deadline") +} + func isStartingUp(err error) bool { if herr, ok := err.(*cmn.ErrHTTP); ok { return herr.Status == http.StatusServiceUnavailable diff --git a/cmd/cli/cli/object_hdlr.go b/cmd/cli/cli/object_hdlr.go index 9c773356778..d423496b076 100644 --- a/cmd/cli/cli/object_hdlr.go +++ b/cmd/cli/cli/object_hdlr.go @@ -125,6 +125,7 @@ var ( yesFlag, continueOnErrorFlag, unitsFlag, + putRetriesFlag, // cksum skipVerCksumFlag, putObjDfltCksumFlag, @@ -319,7 +320,8 @@ func putHandler(c *cli.Context) error { a.dst.oname += a.src.arg } if err := putRegular(c, a.dst.bck, a.dst.oname, a.src.abspath, a.src.finfo); err != nil { - return err + e := stripErr(err) + return fmt.Errorf("failed to %s %s => %s: %v", a.verb(), a.src.abspath, a.dst.bck.Cname(a.dst.oname), e) } actionDone(c, fmt.Sprintf("%s %q => %s\n", a.verb(), a.src.arg, a.dst.bck.Cname(a.dst.oname))) return nil diff --git a/cmd/cli/cli/verbfobj.go b/cmd/cli/cli/verbfobj.go index 03ec23b63db..fe0c48962ad 100644 --- a/cmd/cli/cli/verbfobj.go +++ b/cmd/cli/cli/verbfobj.go @@ -188,7 +188,7 @@ func (p *uparams) do(c *cli.Context) error { return nil } -func (p *uparams) _putOne(c *cli.Context, fobj fobj, reader cos.ReadOpenCloser, skipVC bool) (err error) { +func (p *uparams) _putOne(c *cli.Context, fobj fobj, reader cos.ReadOpenCloser, skipVC, isTout bool) (err error) { if p.dryRun { fmt.Fprintf(c.App.Writer, "%s %s -> %s\n", p.wop.verb(), fobj.path, p.bck.Cname(fobj.dstName)) return @@ -202,6 +202,9 @@ func (p *uparams) _putOne(c *cli.Context, fobj fobj, reader cos.ReadOpenCloser, Size: uint64(fobj.size), SkipVC: skipVC, } + if isTout { + putArgs.BaseParams.Client.Timeout = longClientTimeout + } _, err = api.PutObject(&putArgs) return } @@ -305,10 +308,36 @@ func (u *uctx) do(c *cli.Context, p *uparams, fobj fobj, fh *cos.FileHandle, upd err error skipVC = flagIsSet(c, skipVerCksumFlag) countReader = cos.NewCallbackReadOpenCloser(fh, updateBar /*progress callback*/) + retries = 1 + isTout bool ) + if flagIsSet(c, putRetriesFlag) { + retries = max(parseIntFlag(c, putRetriesFlag), 1) + } switch p.wop.verb() { case "PUT": - err = p._putOne(c, fobj, countReader, skipVC) + for i := range retries { + err = p._putOne(c, fobj, countReader, skipVC, isTout) + if err == nil { + if i > 0 { + fmt.Fprintf(c.App.Writer, "[#%d] %s - done.\n", i+1, fobj.path) + } + break + } + e := stripErr(err) + if i < retries-1 { + s := fmt.Sprintf("[#%d] %s: %v - retrying...", i+1, fobj.path, e) + fmt.Fprintln(c.App.ErrWriter, s) + time.Sleep(time.Second) + ffh, errO := fh.Open() + if errO != nil { + fmt.Fprintf(c.App.ErrWriter, "failed to reopen %s: %v\n", fobj.path, errO) + break + } + countReader = cos.NewCallbackReadOpenCloser(ffh, updateBar /*progress callback*/) + isTout = isTimeout(e) + } + } case "APPEND": err = p._a2aOne(c, fobj, countReader, skipVC) default: @@ -317,11 +346,12 @@ func (u *uctx) do(c *cli.Context, p *uparams, fobj fobj, fh *cos.FileHandle, upd return } if err != nil { - str := fmt.Sprintf("Failed to %s %s: %v\n", p.wop.verb(), p.bck.Cname(fobj.dstName), err) + e := stripErr(err) + str := fmt.Sprintf("Failed to %s %s => %s: %v\n", p.wop.verb(), fobj.path, p.bck.Cname(fobj.dstName), e) if u.showProgress { u.errSb.WriteString(str) } else { - fmt.Fprint(c.App.Writer, str) + fmt.Fprint(c.App.ErrWriter, str) } u.errCount.Inc() } else if u.verbose && !u.showProgress && !p.dryRun { @@ -361,6 +391,7 @@ func putRegular(c *cli.Context, bck cmn.Bck, objName, path string, finfo os.File progress *mpb.Progress bars []*mpb.Bar cksum *cos.Cksum + retries = 1 ) if flagIsSet(c, dryRunFlag) { // resulting message printed upon return @@ -391,10 +422,34 @@ func putRegular(c *cli.Context, bck cmn.Bck, objName, path string, finfo os.File Cksum: cksum, SkipVC: flagIsSet(c, skipVerCksumFlag), } - _, err = api.PutObject(&putArgs) + + if flagIsSet(c, putRetriesFlag) { + retries = max(parseIntFlag(c, putRetriesFlag), 1) + } + + for i := range retries { + _, err = api.PutObject(&putArgs) + if err == nil { + if i > 0 { + fmt.Fprintf(c.App.Writer, "[#%d] %s - done.\n", i+1, path) + } + break + } + e := stripErr(err) + if i < retries-1 { + s := fmt.Sprintf("[#%d] %s: %v - retrying...", i+1, path, e) + fmt.Fprintln(c.App.ErrWriter, s) + time.Sleep(time.Second) + putArgs.Reader, err = fh.Open() + if isTimeout(e) { + putArgs.BaseParams.Client.Timeout = longClientTimeout + } + } + } if progress != nil { progress.Wait() } + return err }