diff --git a/ais/fspathrgrp.go b/ais/fspathrgrp.go index afb36ce813..4e4d7790fa 100644 --- a/ais/fspathrgrp.go +++ b/ais/fspathrgrp.go @@ -14,7 +14,6 @@ import ( "github.com/NVIDIA/aistore/cmn/debug" "github.com/NVIDIA/aistore/cmn/nlog" "github.com/NVIDIA/aistore/core" - "github.com/NVIDIA/aistore/ext/dsort" "github.com/NVIDIA/aistore/fs" "github.com/NVIDIA/aistore/ios" "github.com/NVIDIA/aistore/res" @@ -62,12 +61,6 @@ func (g *fsprungroup) attachMpath(mpath string, label ios.Label) (addedMi *fs.Mo } func (g *fsprungroup) _postAdd(action string, mi *fs.Mountpath) { - // NOTE: - // - currently, dsort doesn't handle (add/enable/disable/detach mountpath) at runtime - // - consider integrating via `xreg.LimitedCoexistence` - // - review all xact.IsMountpath(kind) == true - dsort.Managers.AbortAll(fmt.Errorf("%q %s", action, mi)) - fspathsConfigAddDel(mi.Path, true /*add*/) go func() { if cmn.GCO.Get().Resilver.Enabled { @@ -134,10 +127,6 @@ func (g *fsprungroup) doDD(action string, flags uint64, mpath string, dontResilv if err != nil || rmi == nil { return nil, err } - - // NOTE: above - dsort.Managers.AbortAll(fmt.Errorf("%q %s", action, rmi)) - if numAvail == 0 { nlog.Errorf("%s: lost (via %q) the last available mountpath %q", g.t.si, action, rmi) g.postDD(rmi, action, nil /*xaction*/, nil /*error*/) // go ahead to disable/detach diff --git a/ais/prxnotif.go b/ais/prxnotif.go index 8cb3238710..d7a2ec0ae1 100644 --- a/ais/prxnotif.go +++ b/ais/prxnotif.go @@ -353,9 +353,9 @@ func (n *notifs) housekeep(int64) time.Duration { now := time.Now().UnixNano() n.fin.mtx.Lock() for _, nl := range n.fin.m { - timeout := hk.DelOldIval + timeout := hk.OldAgeNotif if nl.Kind() == apc.ActList { - timeout = hk.OldAgeLsoNotif + timeout = hk.OldAgeNotifLso } if time.Duration(now-nl.EndTime()) > timeout { n.fin.del(nl, true /*locked*/) diff --git a/ais/tgtec.go b/ais/tgtec.go index d2e9b15fb0..e31a649f81 100644 --- a/ais/tgtec.go +++ b/ais/tgtec.go @@ -74,9 +74,9 @@ func (t *target) httpecpost(w http.ResponseWriter, r *http.Request) { action := apiItems[0] switch action { case apc.ActEcOpen: - ec.ECM.OpenStreams() + ec.ECM.OpenStreams(false /*with refc*/) case apc.ActEcClose: - ec.ECM.CloseStreams() + ec.ECM.CloseStreams(false /*with refc*/) default: t.writeErr(w, r, errActEc(action)) } diff --git a/ais/txn.go b/ais/txn.go index bd1fc0e3a2..967644738c 100644 --- a/ais/txn.go +++ b/ais/txn.go @@ -25,7 +25,6 @@ import ( // GC const ( - gcTxnsInterval = time.Hour gcTxnsNumKeep = 16 gcTxnsTimeotMult = 10 @@ -157,7 +156,7 @@ func (txns *transactions) init(t *target) { txns.t = t txns.m = make(map[string]txn, 8) txns.rendezvous.m = make(map[string]rndzvs, 8) - hk.Reg("txn"+hk.NameSuffix, txns.housekeep, gcTxnsInterval) + hk.Reg("txn"+hk.NameSuffix, txns.housekeep, hk.DelOldIval) } func (txns *transactions) begin(txn txn, nlps ...core.NLP) (err error) { @@ -339,15 +338,15 @@ func (txns *transactions) housekeep(int64) (d time.Duration) { orphans []txn config = cmn.GCO.Get() ) - d = gcTxnsInterval + d = hk.DelOldIval txns.mtx.Lock() l := len(txns.m) if l == 0 { txns.mtx.Unlock() return } - if l > max(gcTxnsNumKeep*4, 16) { - d = gcTxnsInterval / 10 + if l > max(gcTxnsNumKeep<<2, 32) { + d >>= 2 } now := time.Now() for _, txn := range txns.m { diff --git a/ec/manager.go b/ec/manager.go index 1c19908dff..4168ab028c 100644 --- a/ec/manager.go +++ b/ec/manager.go @@ -62,7 +62,7 @@ func initManager() error { } if ECM.bmd.IsECUsed() { // TODO -- FIXME: remove - ECM.OpenStreams() + ECM.OpenStreams(false) } return nil } @@ -92,7 +92,10 @@ func cbReq(hdr *transport.ObjHdr, _ io.ReadCloser, _ any, err error) { } } -func (mgr *Manager) OpenStreams() { +func (mgr *Manager) OpenStreams(withRefc bool) { + if withRefc { + mgr._refc.Inc() + } if !mgr.bundleEnabled.CAS(false, true) { return } @@ -119,7 +122,10 @@ func (mgr *Manager) OpenStreams() { mgr.respBundle.Store(bundle.New(client, respSbArgs)) } -func (mgr *Manager) CloseStreams() { +func (mgr *Manager) CloseStreams(withRefc bool) { + if withRefc { + mgr._refc.Dec() + } if !mgr.bundleEnabled.CAS(true, false) { return } @@ -320,9 +326,9 @@ func (mgr *Manager) BMDChanged() error { // TODO -- FIXME: remove if newBMD.IsECUsed() && !oldBMD.IsECUsed() { - mgr.OpenStreams() + mgr.OpenStreams(false) } else if !newBMD.IsECUsed() && oldBMD.IsECUsed() { - mgr.CloseStreams() + mgr.CloseStreams(false) return nil } diff --git a/ext/dload/dispatcher.go b/ext/dload/dispatcher.go index 659def6cfc..48ca7f15f7 100644 --- a/ext/dload/dispatcher.go +++ b/ext/dload/dispatcher.go @@ -54,6 +54,8 @@ type ( // Certification check is disabled for now and does not depend on cluster settings. clientH *http.Client clientTLS *http.Client + + once sync.Once // newInfoStore upon the first execution } ) @@ -68,7 +70,6 @@ func Init(tstats stats.Tracker, db kvdb.Driver, clientConf *cmn.ClientConf) { { g.tstats = tstats g.db = db - g.store = newInfoStore(db) } xreg.RegNonBckXact(&factory{}) } diff --git a/ext/dload/xact.go b/ext/dload/xact.go index f1e48317d9..4c85aac798 100644 --- a/ext/dload/xact.go +++ b/ext/dload/xact.go @@ -169,6 +169,11 @@ func (*factory) New(args xreg.Args, _ *meta.Bck) xreg.Renewable { func (p *factory) Start() error { xdl := newXact(p) p.xctn = xdl + + g.once.Do(func() { + g.store = newInfoStore(g.db) + }) + go xdl.Run(nil) return nil } diff --git a/ext/dsort/dsort.go b/ext/dsort/dsort.go index c9421ae13d..4653c3630d 100644 --- a/ext/dsort/dsort.go +++ b/ext/dsort/dsort.go @@ -467,7 +467,7 @@ func (m *Manager) participateInRecordDistribution(targetOrder meta.Nodes) (curre ) group.Go(func() error { var ( - buf, slab = g.mm.AllocSize(serializationBufSize) + buf, slab = g.mem.AllocSize(serializationBufSize) msgpw = msgp.NewWriterBuf(w, buf) ) defer slab.Free(buf) @@ -861,7 +861,7 @@ func (m *Manager) _dist(si *meta.Snode, s []*shard.Shard, order map[string]*shar ) group.Go(func() error { var ( - buf, slab = g.mm.AllocSize(serializationBufSize) + buf, slab = g.mem.AllocSize(serializationBufSize) msgpw = msgp.NewWriterBuf(w, buf) md = &CreationPhaseMetadata{Shards: s, SendOrder: order} ) diff --git a/ext/dsort/dsort_general.go b/ext/dsort/dsort_general.go index d063a6fc6f..9a58097ce7 100644 --- a/ext/dsort/dsort_general.go +++ b/ext/dsort/dsort_general.go @@ -282,7 +282,7 @@ func (ds *dsorterGeneral) loadLocal(w io.Writer, obj *shard.RecordObj) (written ) if storeType != shard.SGLStoreType { // SGL does not need buffer as it is buffer itself - buf, slab = g.mm.AllocSize(obj.Size) + buf, slab = g.mem.AllocSize(obj.Size) } defer func() { @@ -556,7 +556,7 @@ func (ds *dsorterGeneral) recvResp(hdr *transport.ObjHdr, object io.Reader, err return nil } - buf, slab := g.mm.AllocSize(hdr.ObjAttrs.Size) + buf, slab := g.mem.AllocSize(hdr.ObjAttrs.Size) writer.n, writer.err = io.CopyBuffer(writer.w, object, buf) writer.wg.Done() slab.Free(buf) diff --git a/ext/dsort/dsort_mem.go b/ext/dsort/dsort_mem.go index 973d6e2f5f..f9b81baf09 100644 --- a/ext/dsort/dsort_mem.go +++ b/ext/dsort/dsort_mem.go @@ -138,7 +138,7 @@ func (c *rwConnector) connectReader(key string, r io.Reader, size int64) (err er c.mu.Unlock() if !all { - rw.sgl = g.mm.NewSGL(size) + rw.sgl = g.mem.NewSGL(size) _, err = io.Copy(rw.sgl, r) rw.wgr.Done() return diff --git a/ext/dsort/handler.go b/ext/dsort/handler.go index f91f34ea4f..53b382600d 100644 --- a/ext/dsort/handler.go +++ b/ext/dsort/handler.go @@ -405,9 +405,9 @@ func TargetHandler(w http.ResponseWriter, r *http.Request) { case apc.Start: tstartHandler(w, r) case apc.Records: - Managers.recordsHandler(w, r) + g.mg.recordsHandler(w, r) case apc.Shards: - Managers.shardsHandler(w, r) + g.mg.shardsHandler(w, r) case apc.Abort: tabortHandler(w, r) case apc.Remove: @@ -455,7 +455,7 @@ func tinitHandler(w http.ResponseWriter, r *http.Request) { } managerUUID := apiItems[0] - m, err := Managers.Add(managerUUID) // NOTE: returns manager locked iff err == nil + m, err := g.mg.Add(managerUUID) // NOTE: returns manager locked iff err == nil if err != nil { cmn.WriteErr(w, r, err) return @@ -491,7 +491,7 @@ func tstartHandler(w http.ResponseWriter, r *http.Request) { } managerUUID := apiItems[0] - m, exists := Managers.Get(managerUUID, false /*incl. archived*/) + m, exists := g.mg.Get(managerUUID, false /*incl. archived*/) if !exists { s := fmt.Sprintf("invalid request: job %q does not exist", managerUUID) cmn.WriteErrMsg(w, r, s, http.StatusNotFound) @@ -536,7 +536,7 @@ func (m *Manager) errHandler(err error) { // shardsHandler is the handler for the HTTP endpoint /v1/sort/shards. // A valid POST to this endpoint results in a new shard being created locally based on the contents // of the incoming request body. The shard is then sent to the correct target in the cluster as per HRW. -func (managers *ManagerGroup) shardsHandler(w http.ResponseWriter, r *http.Request) { +func (mg *managerGroup) shardsHandler(w http.ResponseWriter, r *http.Request) { if !checkHTTPMethod(w, r, http.MethodPost) { return } @@ -545,7 +545,7 @@ func (managers *ManagerGroup) shardsHandler(w http.ResponseWriter, r *http.Reque return } managerUUID := apiItems[0] - m, exists := managers.Get(managerUUID, false /*incl. archived*/) + m, exists := mg.Get(managerUUID, false /*incl. archived*/) if !exists { s := fmt.Sprintf("invalid request: job %q does not exist", managerUUID) cmn.WriteErrMsg(w, r, s, http.StatusNotFound) @@ -562,7 +562,7 @@ func (managers *ManagerGroup) shardsHandler(w http.ResponseWriter, r *http.Reque } var ( - buf, slab = g.mm.AllocSize(serializationBufSize) + buf, slab = g.mem.AllocSize(serializationBufSize) tmpMetadata = &CreationPhaseMetadata{} ) defer slab.Free(buf) @@ -585,7 +585,7 @@ func (managers *ManagerGroup) shardsHandler(w http.ResponseWriter, r *http.Reque // recordsHandler is the handler /v1/sort/records. // A valid POST to this endpoint updates this target's dsortManager.Records with the // []Records from the request body, along with some related state variables. -func (managers *ManagerGroup) recordsHandler(w http.ResponseWriter, r *http.Request) { +func (mg *managerGroup) recordsHandler(w http.ResponseWriter, r *http.Request) { if !checkHTTPMethod(w, r, http.MethodPost) { return } @@ -594,7 +594,7 @@ func (managers *ManagerGroup) recordsHandler(w http.ResponseWriter, r *http.Requ return } managerUUID := apiItems[0] - m, exists := managers.Get(managerUUID, false /*incl. archived*/) + m, exists := mg.Get(managerUUID, false /*incl. archived*/) if !exists { s := fmt.Sprintf("invalid request: job %q does not exist", managerUUID) cmn.WriteErrMsg(w, r, s, http.StatusNotFound) @@ -633,7 +633,7 @@ func (managers *ManagerGroup) recordsHandler(w http.ResponseWriter, r *http.Requ } var ( - buf, slab = g.mm.AllocSize(serializationBufSize) + buf, slab = g.mem.AllocSize(serializationBufSize) records = shard.NewRecords(int(d)) ) defer slab.Free(buf) @@ -669,7 +669,7 @@ func tabortHandler(w http.ResponseWriter, r *http.Request) { } managerUUID := apiItems[0] - m, exists := Managers.Get(managerUUID, true /*incl. archived*/) + m, exists := g.mg.Get(managerUUID, true /*incl. archived*/) if !exists { s := fmt.Sprintf("%s: [dsort] %s does not exist", core.T, managerUUID) cmn.WriteErrMsg(w, r, s, http.StatusNotFound) @@ -695,7 +695,7 @@ func tremoveHandler(w http.ResponseWriter, r *http.Request) { } managerUUID := apiItems[0] - if err := Managers.Remove(managerUUID); err != nil { + if err := g.mg.Remove(managerUUID); err != nil { cmn.WriteErr(w, r, err) return } @@ -719,7 +719,7 @@ func tlistHandler(w http.ResponseWriter, r *http.Request) { } } - w.Write(cos.MustMarshal(Managers.List(regex, onlyActive))) + w.Write(cos.MustMarshal(g.mg.List(regex, onlyActive))) } // /v1/sort/metrics. @@ -734,7 +734,7 @@ func tmetricsHandler(w http.ResponseWriter, r *http.Request) { } managerUUID := apiItems[0] - m, exists := Managers.Get(managerUUID, true /*incl. archived*/) + m, exists := g.mg.Get(managerUUID, true /*incl. archived*/) if !exists { s := fmt.Sprintf("%s: [dsort] %s does not exist", core.T, managerUUID) cmn.WriteErrMsg(w, r, s, http.StatusNotFound) @@ -763,7 +763,7 @@ func tfiniHandler(w http.ResponseWriter, r *http.Request) { } managerUUID, tid := apiItems[0], apiItems[1] - m, exists := Managers.Get(managerUUID, false /*incl. archived*/) + m, exists := g.mg.Get(managerUUID, false /*incl. archived*/) if !exists { s := fmt.Sprintf("invalid request: job %q does not exist", managerUUID) cmn.WriteErrMsg(w, r, s, http.StatusNotFound) diff --git a/ext/dsort/manager.go b/ext/dsort/manager.go index 5ebac736b1..7df6442d34 100644 --- a/ext/dsort/manager.go +++ b/ext/dsort/manager.go @@ -54,7 +54,11 @@ const ( type ( global struct { tstats stats.Tracker - mm *memsys.MMSA + mem *memsys.MMSA + + // internal + mg *managerGroup + once sync.Once // reg housekeep upon the first usage } buildingShardInfo struct { shardName string @@ -80,7 +84,7 @@ type ( Metrics *Metrics `json:"metrics"` Pars *parsedReqSpec `json:"pars"` - mg *ManagerGroup // parent + mg *managerGroup // parent mu sync.Mutex smap *meta.Smap recm *shard.RecordManager @@ -134,14 +138,14 @@ func Pinit(si core.Node, config *cmn.Config) { } func Tinit(tstats stats.Tracker, db kvdb.Driver, config *cmn.Config) { - Managers = NewManagerGroup(db, false) + g.mg = newManagerGroup(db) xreg.RegBckXact(&factory{}) - debug.Assert(g.mm == nil) // only once + debug.Assert(g.mem == nil) // only once { g.tstats = tstats - g.mm = core.T.PageMM() + g.mem = core.T.PageMM() } fs.CSM.Reg(ct.DsortFileType, &ct.DsortFile{}) fs.CSM.Reg(ct.DsortWorkfileType, &ct.DsortFile{}) diff --git a/ext/dsort/manager_group.go b/ext/dsort/manager_group.go index f2a66d195f..cd3ae2e8ba 100644 --- a/ext/dsort/manager_group.go +++ b/ext/dsort/manager_group.go @@ -25,31 +25,25 @@ const ( managersKey = "managers" ) -var Managers *ManagerGroup - -// ManagerGroup abstracts multiple dsort managers into single struct. -type ManagerGroup struct { +// managerGroup abstracts multiple dsort managers into single struct. +type managerGroup struct { mtx sync.Mutex // Synchronizes reading managers field and db access managers map[string]*Manager db kvdb.Driver } -// NewManagerGroup returns new, initialized manager group. -func NewManagerGroup(db kvdb.Driver, skipHk bool) *ManagerGroup { - mg := &ManagerGroup{ +func newManagerGroup(db kvdb.Driver) *managerGroup { + mg := &managerGroup{ managers: make(map[string]*Manager, 1), db: db, } - if !skipHk { - hk.Reg(apc.ActDsort+hk.NameSuffix, mg.housekeep, hk.DayInterval) - } return mg } // Add new, non-initialized manager with given managerUUID to manager group. // Returned manager is locked, it's caller responsibility to unlock it. // Returns error when manager with specified managerUUID already exists. -func (mg *ManagerGroup) Add(managerUUID string) (*Manager, error) { +func (mg *managerGroup) Add(managerUUID string) (*Manager, error) { mg.mtx.Lock() defer mg.mtx.Unlock() if _, exists := mg.managers[managerUUID]; exists { @@ -64,7 +58,7 @@ func (mg *ManagerGroup) Add(managerUUID string) (*Manager, error) { return manager, nil } -func (mg *ManagerGroup) List(descRegex *regexp.Regexp, onlyActive bool) []JobInfo { +func (mg *managerGroup) List(descRegex *regexp.Regexp, onlyActive bool) []JobInfo { mg.mtx.Lock() defer mg.mtx.Unlock() @@ -113,7 +107,7 @@ func (mg *ManagerGroup) List(descRegex *regexp.Regexp, onlyActive bool) []JobInf // exist and user requested persisted lookup, it looks for it in persistent // storage and returns it if found. Returns false if does not exist, true // otherwise. -func (mg *ManagerGroup) Get(managerUUID string, inclArchived bool) (*Manager, bool) { +func (mg *managerGroup) Get(managerUUID string, inclArchived bool) (*Manager, bool) { mg.mtx.Lock() defer mg.mtx.Unlock() @@ -132,7 +126,7 @@ func (mg *ManagerGroup) Get(managerUUID string, inclArchived bool) (*Manager, bo } // Remove the managerUUID from history. Used for reducing clutter. Fails if process hasn't been cleaned up. -func (mg *ManagerGroup) Remove(managerUUID string) error { +func (mg *managerGroup) Remove(managerUUID string) error { mg.mtx.Lock() defer mg.mtx.Unlock() @@ -153,7 +147,7 @@ func (mg *ManagerGroup) Remove(managerUUID string) error { // // When error occurs during moving manager to persistent storage, manager is not // removed from memory. -func (mg *ManagerGroup) persist(managerUUID string) { +func (mg *managerGroup) persist(managerUUID string) { mg.mtx.Lock() defer mg.mtx.Unlock() manager, exists := mg.managers[managerUUID] @@ -170,16 +164,7 @@ func (mg *ManagerGroup) persist(managerUUID string) { delete(mg.managers, managerUUID) } -func (mg *ManagerGroup) AbortAll(err error) { - mg.mtx.Lock() - defer mg.mtx.Unlock() - - for _, manager := range mg.managers { - manager.abort(err) - } -} - -func (mg *ManagerGroup) housekeep(int64) time.Duration { +func (mg *managerGroup) housekeep(int64) time.Duration { const ( retryInterval = time.Hour // retry interval in case error occurred regularInterval = hk.DayInterval diff --git a/ext/dsort/mem_watcher.go b/ext/dsort/mem_watcher.go index 7f858dbc61..7493ab181a 100644 --- a/ext/dsort/mem_watcher.go +++ b/ext/dsort/mem_watcher.go @@ -125,7 +125,7 @@ func (mw *memoryWatcher) watchReserved() { func (mw *memoryWatcher) watchExcess(memStat sys.MemStat) { defer mw.excess.wg.Done() - buf, slab := g.mm.Alloc() + buf, slab := g.mem.Alloc() defer slab.Free(buf) lastMemoryUsage := memStat.ActualUsed diff --git a/ext/dsort/xact.go b/ext/dsort/xact.go index 83baa0c167..ca1288d45f 100644 --- a/ext/dsort/xact.go +++ b/ext/dsort/xact.go @@ -11,6 +11,7 @@ import ( "github.com/NVIDIA/aistore/cmn/debug" "github.com/NVIDIA/aistore/core" "github.com/NVIDIA/aistore/core/meta" + "github.com/NVIDIA/aistore/hk" "github.com/NVIDIA/aistore/xact" "github.com/NVIDIA/aistore/xact/xreg" ) @@ -40,6 +41,11 @@ func (p *factory) Start() error { debug.Assert(ok) p.xctn = &xaction{args: args} p.xctn.InitBase(p.UUID(), apc.ActDsort, args.BckTo /*compare w/ tcb and tco*/) + + g.once.Do(func() { + hk.Reg(apc.ActDsort+hk.NameSuffix, g.mg.housekeep, hk.DayInterval) + }) + return nil } @@ -60,7 +66,7 @@ func (*xaction) Run(*sync.WaitGroup) { debug.Assert(false) } // - Manager.abort(errs ...error) legacy, and // - xaction.Abort, to implement the corresponding interface and uniformly support `api.AbortXaction` func (r *xaction) Abort(err error) (ok bool) { - m, exists := Managers.Get(r.ID(), false /*incl. archived*/) + m, exists := g.mg.Get(r.ID(), false /*incl. archived*/) if !exists { return } @@ -75,7 +81,7 @@ func (r *xaction) Snap() (snap *core.Snap) { snap = &core.Snap{} r.ToSnap(snap) - m, exists := Managers.Get(r.ID(), true /*incl. archived*/) + m, exists := g.mg.Get(r.ID(), true /*incl. archived*/) if !exists { return } diff --git a/hk/common_durations.go b/hk/common_durations.go index 39ad0fd133..f1276f1014 100644 --- a/hk/common_durations.go +++ b/hk/common_durations.go @@ -1,7 +1,7 @@ // Package hk provides mechanism for registering cleanup // functions which are invoked at specified intervals. /* - * Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. */ package hk @@ -10,10 +10,14 @@ import "time" // common cleanup-related durations const ( - DelOldIval = 8 * time.Minute // hk interval: cleanup old - PruneActiveIval = 2 * time.Minute // hk interval: prune active + DelOldIval = 24 * time.Minute // hk-cleanup old xactions; old transactions + PruneActiveIval = 2 * time.Minute // hk-prune active xactions; cleanup notifs - OldAgeLso = time.Minute // when list-objects is considered old - OldAgeLsoNotif = 10 * time.Second // notifications-wise - OldAgeX = time.Hour // all other X + // + // when things are considered _old_ + // + OldAgeLso = time.Minute // list-objects + OldAgeNotif = 3 * time.Minute // old notifications + OldAgeNotifLso = 10 * time.Second // ditto lso + OldAgeX = time.Hour // xactions ) diff --git a/reb/globrun.go b/reb/globrun.go index 639303b7ae..d0d153b0c2 100644 --- a/reb/globrun.go +++ b/reb/globrun.go @@ -25,6 +25,7 @@ import ( "github.com/NVIDIA/aistore/cmn/prob" "github.com/NVIDIA/aistore/core" "github.com/NVIDIA/aistore/core/meta" + "github.com/NVIDIA/aistore/ec" "github.com/NVIDIA/aistore/fs" "github.com/NVIDIA/aistore/transport" "github.com/NVIDIA/aistore/transport/bundle" @@ -185,7 +186,7 @@ func (reb *Reb) RunRebalance(smap *meta.Smap, id int64, notif *xact.NotifXact, t reb.mu.Unlock() logHdr := reb.logHdr(id, smap, true /*initializing*/) - nlog.Infoln(logHdr + ": initializing") + nlog.Infoln(logHdr, "initializing") bmd := core.T.Bowner().Get() rargs := &rebArgs{id: id, smap: smap, config: cmn.GCO.Get(), ecUsed: bmd.IsECUsed()} @@ -221,6 +222,9 @@ func (reb *Reb) RunRebalance(smap *meta.Smap, id int64, notif *xact.NotifXact, t // At this point, only one rebalance is running + if rargs.ecUsed { + ec.ECM.OpenStreams(true /*with refc*/) + } onGFN() tstats.SetFlag(cos.NodeAlerts, cos.Rebalancing) @@ -242,6 +246,9 @@ func (reb *Reb) RunRebalance(smap *meta.Smap, id int64, notif *xact.NotifXact, t tstats.ClrFlag(cos.NodeAlerts, cos.Rebalancing) offGFN() + if rargs.ecUsed { + ec.ECM.CloseStreams(true /*with refc*/) + } } // To optimize goroutine creation: diff --git a/xact/xreg/xreg.go b/xact/xreg/xreg.go index 8b02e3cd9e..5a93eb6516 100644 --- a/xact/xreg/xreg.go +++ b/xact/xreg/xreg.go @@ -457,8 +457,12 @@ func (r *registry) hkDelOld(int64) time.Duration { } r.entries.mtx.RUnlock() - if len(toRemove) == 0 { - return hk.DelOldIval + d, ll := hk.DelOldIval, len(toRemove) + if l-ll > keepOldThreshold<<1 { + d >>= 1 + } + if ll == 0 { + return d } // cleanup @@ -467,7 +471,8 @@ func (r *registry) hkDelOld(int64) time.Duration { r.entries.del(id) } r.entries.mtx.Unlock() - return hk.DelOldIval + + return d } func (r *registry) renewByID(entry Renewable, bck *meta.Bck) (rns RenewRes) {