Skip to content

Commit

Permalink
bucket summary: fix _begin_ timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jun 12, 2024
1 parent 7c4dd1c commit bc1ee69
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 9 deletions.
8 changes: 5 additions & 3 deletions ais/htrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ To troubleshoot:
4. restart %s
-----------------`

const dfltDetail = "[control-plane]"

// extra or extended state - currently, target only
type htext interface {
interruptedRestarted() (bool, bool)
Expand Down Expand Up @@ -602,13 +604,13 @@ func (h *htrun) call(args *callArgs, smap *smapX) (res *callResult) {
if res.err != nil {
break
}
client = g.client.control
client = g.client.control // timeout = config.Client.Timeout ("client.client_timeout")
case apc.LongTimeout:
req, res.err = args.req.Req()
if res.err != nil {
break
}
client = g.client.data
client = g.client.data // timeout = config.Client.TimeoutLong ("client.client_long_timeout")
default:
var cancel context.CancelFunc
if args.timeout == 0 {
Expand Down Expand Up @@ -648,7 +650,7 @@ func (h *htrun) call(args *callArgs, smap *smapX) (res *callResult) {

resp, res.err = client.Do(req)
if res.err != nil {
res.details = "[control-plane]" // tcp level, e.g.: connection refused
res.details = dfltDetail // tcp level, e.g.: connection refused
return
}
defer resp.Body.Close()
Expand Down
6 changes: 6 additions & 0 deletions ais/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2243,6 +2243,9 @@ func (p *proxy) lsObjsA(bck *meta.Bck, lsmsg *apc.LsoMsg) (allEntries *cmn.LsoRe
freeBcArgs(args)
for _, res := range results {
if res.err != nil {
if res.details == "" || res.details == dfltDetail {
res.details = xact.Cname(apc.ActList, lsmsg.UUID)
}
err = res.toErr()
freeBcastRes(results)
return nil, err
Expand Down Expand Up @@ -2357,6 +2360,9 @@ func (p *proxy) lsObjsR(bck *meta.Bck, lsmsg *apc.LsoMsg, hdr http.Header, smap
resLists := make([]*cmn.LsoRes, 0, len(results))
for _, res := range results {
if res.err != nil {
if res.details == "" || res.details == dfltDetail {
res.details = xact.Cname(apc.ActList, lsmsg.UUID)
}
err := res.toErr()
freeBcastRes(results)
return nil, err
Expand Down
23 changes: 18 additions & 5 deletions ais/prxbsumm.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/core/meta"
"github.com/NVIDIA/aistore/xact"
)

// in this source:
Expand Down Expand Up @@ -60,22 +61,30 @@ func (p *proxy) bsummNew(qbck *cmn.QueryBcks, msg *apc.BsummCtrlMsg) (err error)
Query: q,
Body: cos.MustMarshal(aisMsg),
}
// not using default control-plane timeout -
// returning only _after_ all targets start running this new job
// (see Run() in nsumm.go)
args.timeout = apc.DefaultTimeout

args.smap = p.owner.smap.get()
if cnt := args.smap.CountActiveTs(); cnt < 1 {
return cmn.NewErrNoNodes(apc.Target, args.smap.CountTargets())
}
results := p.bcastGroup(args)
for _, res := range results {
if res.err != nil {
if res.details == "" || res.details == dfltDetail {
res.details = xact.Cname(apc.ActSummaryBck, msg.UUID)
}
err = res.toErr()
break
}
}
freeBcastRes(results)
return
return err
}

func (p *proxy) bsummCollect(qbck *cmn.QueryBcks, msg *apc.BsummCtrlMsg) (_ cmn.AllBsummResults, status int, _ error) {
func (p *proxy) bsummCollect(qbck *cmn.QueryBcks, msg *apc.BsummCtrlMsg) (_ cmn.AllBsummResults, status int, err error) {
var (
q = make(url.Values, 4)
aisMsg = p.newAmsgActVal(apc.ActSummaryBck, msg)
Expand All @@ -99,8 +108,12 @@ func (p *proxy) bsummCollect(qbck *cmn.QueryBcks, msg *apc.BsummCtrlMsg) (_ cmn.
freeBcArgs(args)
for _, res := range results {
if res.err != nil {
if res.details == "" || res.details == dfltDetail {
res.details = xact.Cname(apc.ActSummaryBck, msg.UUID)
}
err = res.toErr()
freeBcastRes(results)
return nil, 0, res.toErr()
return nil, 0, err
}
}

Expand Down Expand Up @@ -148,11 +161,11 @@ func (p *proxy) bsummhead(bck *meta.Bck, msg *apc.BsummCtrlMsg) (info *cmn.Bsumm
if err = p.bsummNew(qbck, msg); err == nil {
status = http.StatusAccepted
}
return
return info, status, err
}
summaries, status, err = p.bsummCollect(qbck, msg)
if err == nil && (status == http.StatusOK || status == http.StatusPartialContent) {
info = summaries[0]
}
return
return info, status, err
}
2 changes: 1 addition & 1 deletion xact/xreg/xreg.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (e *entries) getAllRunning(inout *core.AllRunningInOut, periodic bool) {
continue
}
var (
xqn = k + xact.LeftID + xctn.ID() + xact.RightID // e.g. "make-n-copies[fGhuvvn7t]"
xqn = xctn.Cname() // e.g. "make-n-copies[fGhuvvn7t]"
isIdle bool
)
if inout.Idle != nil {
Expand Down

0 comments on commit bc1ee69

Please sign in to comment.