Skip to content

Commit

Permalink
follow-up
Browse files Browse the repository at this point in the history
* python: prefetch w/ num-workers
* up cli
* close/reopen EC streams: negative timeout (fix)
* list-range xactions: minor ref

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Sep 13, 2024
1 parent a5a3024 commit e3b6463
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 43 deletions.
6 changes: 4 additions & 2 deletions ais/prxec.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ func (p *proxy) _recvActiveEC(hdr http.Header, now int64) {
return
}
// check if time has come to close (easy checks first)
if p.ec.rust == 0 || time.Duration(now-p.ec.rust) < cmn.Rom.EcStreams() {
tout := cmn.Rom.EcStreams()
if p.ec.rust == 0 || tout < 0 || time.Duration(now-p.ec.rust) < tout {
return
}
last := p.ec.last.Load()
if last == 0 || time.Duration(now-last) < cmn.Rom.EcStreams() {
if last == 0 || time.Duration(now-last) < tout {
return
}

Expand All @@ -80,6 +81,7 @@ func (p *proxy) _respActiveEC(hdr http.Header, now int64) {
tout := cmn.Rom.EcStreams()
last := p.ec.last.Load()
if last != 0 && time.Duration(now-last) < tout {
debug.Assert(tout > 0)
hdr.Set(apc.HdrActiveEC, "true")
}
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/NVIDIA/aistore/cmd/cli
go 1.22.3

require (
github.com/NVIDIA/aistore v1.3.24-0.20240913135903-a3912a7b3c07
github.com/NVIDIA/aistore v1.3.24-0.20240913143303-a5a30247de42
github.com/fatih/color v1.17.0
github.com/json-iterator/go v1.1.12
github.com/onsi/ginkgo/v2 v2.20.0
Expand Down
4 changes: 2 additions & 2 deletions cmd/cli/go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc=
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/NVIDIA/aistore v1.3.24-0.20240913135903-a3912a7b3c07 h1:cQnI9RQ+4qdN4co4Vr60WmNIPuAAede//MVtGooZVkU=
github.com/NVIDIA/aistore v1.3.24-0.20240913135903-a3912a7b3c07/go.mod h1:si83S9r29vwIC0f0CE2Mk+25bFiaN6mmVlmuBpP4hHM=
github.com/NVIDIA/aistore v1.3.24-0.20240913143303-a5a30247de42 h1:LOn+urZYj/Elby2O91Ig0FlZqCNduvr/blM8GV5dVME=
github.com/NVIDIA/aistore v1.3.24-0.20240913143303-a5a30247de42/go.mod h1:si83S9r29vwIC0f0CE2Mk+25bFiaN6mmVlmuBpP4hHM=
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
Expand Down
3 changes: 3 additions & 0 deletions python/aistore/sdk/multiobj/object_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ def evict(self):
def prefetch(
self,
blob_threshold: int = None,
num_workers: int = None,
latest: bool = False,
continue_on_error: bool = False,
):
Expand All @@ -182,6 +183,7 @@ def prefetch(
Args:
latest (bool, optional): GET the latest object version from the associated remote bucket
continue_on_error (bool, optional): Whether to continue if there is an error prefetching a single object
num_workers (int, optional): Number of concurrent workers; number of target mountpaths if omitted or zero
blob_threshold (int, optional): Utilize built-in blob-downloader for remote objects
greater than the specified (threshold) size in bytes
Expand All @@ -204,6 +206,7 @@ def prefetch(
continue_on_err=continue_on_error,
latest=latest,
blob_threshold=blob_threshold,
num_workers=num_workers,
).as_dict()

return self.bck.make_request(
Expand Down
6 changes: 3 additions & 3 deletions xact/xs/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type (
}
archtask struct {
wi *archwi
lrit *lriterator
lrit *lrit
}
jogger struct {
mpath *fs.Mountpath
Expand Down Expand Up @@ -243,7 +243,7 @@ func (r *XactArch) Do(msg *cmn.ArchiveBckMsg) {
r.DecPending()
r.cleanup()
}
var lrit = &lriterator{}
var lrit = &lrit{}

// lrpWorkersNone since we need a single writer to serialize adding files
// into an eventual `archlom`
Expand Down Expand Up @@ -543,7 +543,7 @@ func (wi *archwi) openTarForAppend() (err error) {
}

// multi-object iterator i/f: "handle work item"
func (wi *archwi) do(lom *core.LOM, lrit *lriterator) {
func (wi *archwi) do(lom *core.LOM, lrit *lrit) {
var coldGet bool
if err := lom.Load(false /*cache it*/, false /*locked*/); err != nil {
if !cos.IsNotExist(err, 0) {
Expand Down
10 changes: 5 additions & 5 deletions xact/xs/evict.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type (
kind string
}
evictDelete struct {
lriterator
lrit
xact.Base
config *cmn.Config
}
Expand Down Expand Up @@ -57,7 +57,7 @@ func (*evdFactory) WhenPrevIsRunning(xreg.Renewable) (xreg.WPR, error) {

func newEvictDelete(xargs *xreg.Args, kind string, bck *meta.Bck, msg *apc.ListRange) (ed *evictDelete, err error) {
ed = &evictDelete{config: cmn.GCO.Get()}
if err = ed.lriterator.init(ed, msg, bck, lrpWorkersDflt); err != nil {
if err = ed.lrit.init(ed, msg, bck, lrpWorkersDflt); err != nil {
return nil, err
}
ed.InitBase(xargs.UUID, kind, bck)
Expand All @@ -66,15 +66,15 @@ func newEvictDelete(xargs *xreg.Args, kind string, bck *meta.Bck, msg *apc.ListR

func (r *evictDelete) Run(wg *sync.WaitGroup) {
wg.Done()
err := r.lriterator.run(r, core.T.Sowner().Get())
err := r.lrit.run(r, core.T.Sowner().Get())
if err != nil {
r.AddErr(err, 5, cos.SmoduleXs) // duplicated?
}
r.lriterator.wait()
r.lrit.wait()
r.Finish()
}

func (r *evictDelete) do(lom *core.LOM, lrit *lriterator) {
func (r *evictDelete) do(lom *core.LOM, lrit *lrit) {
ecode, err := core.T.DeleteObject(lom, r.Kind() == apc.ActEvictObjects)
if err == nil { // done
r.ObjsAdd(1, lom.Lsize(true))
Expand Down
30 changes: 15 additions & 15 deletions xact/xs/lrit.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ const (
type (
// one multi-object operation work item
lrwi interface {
do(*core.LOM, *lriterator)
do(*core.LOM, *lrit)
}
// a strict subset of core.Xact, includes only the methods
// lriterator needs for itself
// lrit needs for itself
lrxact interface {
IsAborted() bool
Finished() bool
Expand All @@ -62,11 +62,11 @@ type (
wi lrwi
}
lrworker struct {
lrit *lriterator
lrit *lrit
}

// common multi-object operation context and list|range|prefix logic
lriterator struct {
lrit struct {
parent lrxact
msg *apc.ListRange
bck *meta.Bck
Expand Down Expand Up @@ -99,10 +99,10 @@ var (
)

////////////////
// lriterator //
// lrit //
////////////////

func (r *lriterator) init(xctn lrxact, msg *apc.ListRange, bck *meta.Bck, numWorkers int) error {
func (r *lrit) init(xctn lrxact, msg *apc.ListRange, bck *meta.Bck, numWorkers int) error {
var (
avail = fs.GetAvail()
l = len(avail)
Expand Down Expand Up @@ -165,7 +165,7 @@ func (r *lriterator) init(xctn lrxact, msg *apc.ListRange, bck *meta.Bck, numWor
// - "copy entire source bucket", or even
// - "archive entire bucket as a single shard" (caution!)

func (r *lriterator) _inipr(msg *apc.ListRange) error {
func (r *lrit) _inipr(msg *apc.ListRange) error {
pt, err := cos.NewParsedTemplate(msg.Template)
if err != nil {
if err == cos.ErrEmptyTemplate {
Expand All @@ -189,9 +189,9 @@ pref:
return nil
}

func (r *lriterator) numWorkers() int { return len(r.workers) }
func (r *lrit) numWorkers() int { return len(r.workers) }

func (r *lriterator) run(wi lrwi, smap *meta.Smap) (err error) {
func (r *lrit) run(wi lrwi, smap *meta.Smap) (err error) {
for _, worker := range r.workers {
r.wg.Add(1)
go worker.run()
Expand All @@ -207,17 +207,17 @@ func (r *lriterator) run(wi lrwi, smap *meta.Smap) (err error) {
return err
}

func (r *lriterator) wait() {
func (r *lrit) wait() {
if r.workers == nil {
return
}
close(r.workCh)
r.wg.Wait()
}

func (r *lriterator) done() bool { return r.parent.IsAborted() || r.parent.Finished() }
func (r *lrit) done() bool { return r.parent.IsAborted() || r.parent.Finished() }

func (r *lriterator) _list(wi lrwi, smap *meta.Smap) error {
func (r *lrit) _list(wi lrwi, smap *meta.Smap) error {
r.lrp = lrpList
for _, objName := range r.msg.ObjNames {
if r.done() {
Expand All @@ -236,7 +236,7 @@ func (r *lriterator) _list(wi lrwi, smap *meta.Smap) error {
return nil
}

func (r *lriterator) _range(wi lrwi, smap *meta.Smap) error {
func (r *lrit) _range(wi lrwi, smap *meta.Smap) error {
r.pt.InitIter()
for objName, hasNext := r.pt.Next(); hasNext; objName, hasNext = r.pt.Next() {
if r.done() {
Expand All @@ -256,7 +256,7 @@ func (r *lriterator) _range(wi lrwi, smap *meta.Smap) error {
}

// (compare with ais/plstcx)
func (r *lriterator) _prefix(wi lrwi, smap *meta.Smap) error {
func (r *lrit) _prefix(wi lrwi, smap *meta.Smap) error {
var (
err error
ecode int
Expand Down Expand Up @@ -321,7 +321,7 @@ func (r *lriterator) _prefix(wi lrwi, smap *meta.Smap) error {
return nil
}

func (r *lriterator) do(lom *core.LOM, wi lrwi, smap *meta.Smap) (bool /*this lom done*/, error) {
func (r *lrit) do(lom *core.LOM, wi lrwi, smap *meta.Smap) (bool /*this lom done*/, error) {
if err := lom.InitBck(r.bck.Bucket()); err != nil {
return false, err
}
Expand Down
12 changes: 6 additions & 6 deletions xact/xs/prefetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type (
prefetch struct {
config *cmn.Config
msg *apc.PrefetchMsg
lriterator
lrit
xact.Base
blob struct {
pending []core.Xact
Expand Down Expand Up @@ -89,7 +89,7 @@ func (*prfFactory) WhenPrevIsRunning(xreg.Renewable) (xreg.WPR, error) {
func newPrefetch(xargs *xreg.Args, kind string, bck *meta.Bck, msg *apc.PrefetchMsg) (r *prefetch, err error) {
r = &prefetch{config: cmn.GCO.Get(), msg: msg}

err = r.lriterator.init(r, &msg.ListRange, bck, msg.NumWorkers)
err = r.lrit.init(r, &msg.ListRange, bck, msg.NumWorkers)
if err != nil {
return nil, err
}
Expand All @@ -103,19 +103,19 @@ func newPrefetch(xargs *xreg.Args, kind string, bck *meta.Bck, msg *apc.Prefetch
}

func (r *prefetch) Run(wg *sync.WaitGroup) {
if nw := r.lriterator.numWorkers(); nw > 1 {
if nw := r.lrit.numWorkers(); nw > 1 {
nlog.Infoln(r.Name(), "[", nw, "workers ]")
} else {
nlog.Infoln(r.Name())
}

wg.Done()

err := r.lriterator.run(r, core.T.Sowner().Get())
err := r.lrit.run(r, core.T.Sowner().Get())
if err != nil {
r.AddErr(err, 5, cos.SmoduleXs) // duplicated?
}
r.lriterator.wait()
r.lrit.wait()

// pending blob-downloads
if r.blob.num.Load() > 0 {
Expand All @@ -131,7 +131,7 @@ func (r *prefetch) Run(wg *sync.WaitGroup) {
r.Finish()
}

func (r *prefetch) do(lom *core.LOM, lrit *lriterator) {
func (r *prefetch) do(lom *core.LOM, lrit *lrit) {
var (
err error
size int64
Expand Down
18 changes: 9 additions & 9 deletions xact/xs/tcobjs.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (r *XactTCObjs) Run(wg *sync.WaitGroup) {
case msg := <-r.workCh:
var (
smap = core.T.Sowner().Get()
lrit = &lriterator{}
lrit = &lrit{}
)
debug.Assert(cos.IsValidUUID(msg.TxnUUID), msg.TxnUUID) // (ref050724: in re: ais/plstcx)
r.pending.mtx.Lock()
Expand Down Expand Up @@ -307,7 +307,7 @@ func (r *XactTCObjs) _put(hdr *transport.ObjHdr, objReader io.Reader, lom *core.
// tcowi //
///////////

func (wi *tcowi) do(lom *core.LOM, lrit *lriterator) {
func (wi *tcowi) do(lom *core.LOM, lrit *lrit) {
var (
objNameTo = wi.msg.ToName(lom.ObjName)
buf, slab = core.T.PageMM().Alloc()
Expand Down Expand Up @@ -355,31 +355,31 @@ type syncwi struct {
// interface guard
var _ lrwi = (*syncwi)(nil)

func (r *XactTCObjs) prune(lrit *lriterator, smap *meta.Smap, pt *cos.ParsedTemplate) {
func (r *XactTCObjs) prune(pruneit *lrit, smap *meta.Smap, pt *cos.ParsedTemplate) {
rp := prune{parent: r, smap: smap}
rp.bckFrom, rp.bckTo = r.FromTo()

// tcb use case
if lrit.lrp == lrpPrefix {
rp.prefix = lrit.prefix
if pruneit.lrp == lrpPrefix {
rp.prefix = pruneit.prefix
rp.init(r.config)
rp.run()
rp.wait()
return
}

// same range iterator but different bucket
var syncit lriterator
debug.Assert(lrit.lrp == lrpRange)
var syncit lrit
debug.Assert(pruneit.lrp == lrpRange)

err := syncit.init(lrit.parent, lrit.msg, rp.bckTo, lrpWorkersDflt)
err := syncit.init(pruneit.parent, pruneit.msg, rp.bckTo, lrpWorkersDflt)
debug.AssertNoErr(err)
syncit.pt = pt
syncwi := &syncwi{&rp} // reusing only prune.do (and not init/run/wait)
syncit.run(syncwi, smap)
syncit.wait()
}

func (syncwi *syncwi) do(lom *core.LOM, _ *lriterator) {
func (syncwi *syncwi) do(lom *core.LOM, _ *lrit) {
syncwi.rp.do(lom, nil)
}

0 comments on commit e3b6463

Please sign in to comment.