Skip to content

Commit

Permalink
CLI: 'ais put --retry <count>' with increasing timeout, if need be
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Sep 25, 2024
1 parent 0fc6e98 commit 99b7a96
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 6 deletions.
5 changes: 5 additions & 0 deletions cmd/cli/cli/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,11 @@ var (
Usage: "[end-to-end protection] compute client-side checksum configured for the destination bucket\n" +
putObjCksumText,
}
putRetriesFlag = cli.IntFlag{
Name: "retries",
Value: 1,
Usage: "retry PUT operation so many times (with increasing timeout if timed out)",
}

appendConcatFlag = cli.BoolFlag{
Name: "append",
Expand Down
28 changes: 28 additions & 0 deletions cmd/cli/cli/err.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ func V(err error) error {
return err
}

// with hints and tips (compare with `stripErr` below)
func formatErr(err error) error {
if err == nil {
return nil
Expand Down Expand Up @@ -382,6 +383,33 @@ func formatErr(err error) error {
}
}

// remove "verb URL" from the error (compare with `formatErr`)
// TODO: add more apc.URLPath* paths
func stripErr(err error) error {
var (
s = err.Error()
l int
)
i := strings.Index(s, apc.URLPathObjects.S)
if i < 0 {
i = strings.Index(s, apc.URLPathBuckets.S)
}
if i < 0 {
return err
}

k := strings.Index(s[i+l:], " ")
if k < 0 || len(s) < i+l+k+5 {
return err
}
return errors.New(s[i+l+k+1:])
}

func isTimeout(err error) bool {
s := strings.ToLower(err.Error())
return strings.Contains(s, "timeout") || strings.Contains(s, "deadline")
}

func isStartingUp(err error) bool {
if herr, ok := err.(*cmn.ErrHTTP); ok {
return herr.Status == http.StatusServiceUnavailable
Expand Down
4 changes: 3 additions & 1 deletion cmd/cli/cli/object_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ var (
yesFlag,
continueOnErrorFlag,
unitsFlag,
putRetriesFlag,
// cksum
skipVerCksumFlag,
putObjDfltCksumFlag,
Expand Down Expand Up @@ -319,7 +320,8 @@ func putHandler(c *cli.Context) error {
a.dst.oname += a.src.arg
}
if err := putRegular(c, a.dst.bck, a.dst.oname, a.src.abspath, a.src.finfo); err != nil {
return err
e := stripErr(err)
return fmt.Errorf("failed to %s %s => %s: %v", a.verb(), a.src.abspath, a.dst.bck.Cname(a.dst.oname), e)
}
actionDone(c, fmt.Sprintf("%s %q => %s\n", a.verb(), a.src.arg, a.dst.bck.Cname(a.dst.oname)))
return nil
Expand Down
65 changes: 60 additions & 5 deletions cmd/cli/cli/verbfobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (p *uparams) do(c *cli.Context) error {
return nil
}

func (p *uparams) _putOne(c *cli.Context, fobj fobj, reader cos.ReadOpenCloser, skipVC bool) (err error) {
func (p *uparams) _putOne(c *cli.Context, fobj fobj, reader cos.ReadOpenCloser, skipVC, isTout bool) (err error) {
if p.dryRun {
fmt.Fprintf(c.App.Writer, "%s %s -> %s\n", p.wop.verb(), fobj.path, p.bck.Cname(fobj.dstName))
return
Expand All @@ -202,6 +202,9 @@ func (p *uparams) _putOne(c *cli.Context, fobj fobj, reader cos.ReadOpenCloser,
Size: uint64(fobj.size),
SkipVC: skipVC,
}
if isTout {
putArgs.BaseParams.Client.Timeout = longClientTimeout
}
_, err = api.PutObject(&putArgs)
return
}
Expand Down Expand Up @@ -305,10 +308,36 @@ func (u *uctx) do(c *cli.Context, p *uparams, fobj fobj, fh *cos.FileHandle, upd
err error
skipVC = flagIsSet(c, skipVerCksumFlag)
countReader = cos.NewCallbackReadOpenCloser(fh, updateBar /*progress callback*/)
retries = 1
isTout bool
)
if flagIsSet(c, putRetriesFlag) {
retries = max(parseIntFlag(c, putRetriesFlag), 1)
}
switch p.wop.verb() {
case "PUT":
err = p._putOne(c, fobj, countReader, skipVC)
for i := range retries {
err = p._putOne(c, fobj, countReader, skipVC, isTout)
if err == nil {
if i > 0 {
fmt.Fprintf(c.App.Writer, "[#%d] %s - done.\n", i+1, fobj.path)
}
break
}
e := stripErr(err)
if i < retries-1 {
s := fmt.Sprintf("[#%d] %s: %v - retrying...", i+1, fobj.path, e)
fmt.Fprintln(c.App.ErrWriter, s)
time.Sleep(time.Second)
ffh, errO := fh.Open()
if errO != nil {
fmt.Fprintf(c.App.ErrWriter, "failed to reopen %s: %v\n", fobj.path, errO)
break
}
countReader = cos.NewCallbackReadOpenCloser(ffh, updateBar /*progress callback*/)
isTout = isTimeout(e)
}
}
case "APPEND":
err = p._a2aOne(c, fobj, countReader, skipVC)
default:
Expand All @@ -317,11 +346,12 @@ func (u *uctx) do(c *cli.Context, p *uparams, fobj fobj, fh *cos.FileHandle, upd
return
}
if err != nil {
str := fmt.Sprintf("Failed to %s %s: %v\n", p.wop.verb(), p.bck.Cname(fobj.dstName), err)
e := stripErr(err)
str := fmt.Sprintf("Failed to %s %s => %s: %v\n", p.wop.verb(), fobj.path, p.bck.Cname(fobj.dstName), e)
if u.showProgress {
u.errSb.WriteString(str)
} else {
fmt.Fprint(c.App.Writer, str)
fmt.Fprint(c.App.ErrWriter, str)
}
u.errCount.Inc()
} else if u.verbose && !u.showProgress && !p.dryRun {
Expand Down Expand Up @@ -361,6 +391,7 @@ func putRegular(c *cli.Context, bck cmn.Bck, objName, path string, finfo os.File
progress *mpb.Progress
bars []*mpb.Bar
cksum *cos.Cksum
retries = 1
)
if flagIsSet(c, dryRunFlag) {
// resulting message printed upon return
Expand Down Expand Up @@ -391,10 +422,34 @@ func putRegular(c *cli.Context, bck cmn.Bck, objName, path string, finfo os.File
Cksum: cksum,
SkipVC: flagIsSet(c, skipVerCksumFlag),
}
_, err = api.PutObject(&putArgs)

if flagIsSet(c, putRetriesFlag) {
retries = max(parseIntFlag(c, putRetriesFlag), 1)
}

for i := range retries {
_, err = api.PutObject(&putArgs)
if err == nil {
if i > 0 {
fmt.Fprintf(c.App.Writer, "[#%d] %s - done.\n", i+1, path)
}
break
}
e := stripErr(err)
if i < retries-1 {
s := fmt.Sprintf("[#%d] %s: %v - retrying...", i+1, path, e)
fmt.Fprintln(c.App.ErrWriter, s)
time.Sleep(time.Second)
putArgs.Reader, err = fh.Open()
if isTimeout(e) {
putArgs.BaseParams.Client.Timeout = longClientTimeout
}
}
}
if progress != nil {
progress.Wait()
}

return err
}

Expand Down

0 comments on commit 99b7a96

Please sign in to comment.