Skip to content

Commit

Permalink
rebalance vs dynamic EC streams; housekeeping; dsort; downloader
Browse files Browse the repository at this point in the history
* open/close and ref-count EC streams when rebalancing
* consolidate common housekeeping durations
  - xactions
  - notifications
  - transactions
* dsort & downloader: housekeep upon the first respective usage
* with substantial refactoring

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Sep 9, 2024
1 parent 1114868 commit a91636b
Show file tree
Hide file tree
Showing 18 changed files with 100 additions and 89 deletions.
11 changes: 0 additions & 11 deletions ais/fspathrgrp.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/cmn/nlog"
"github.com/NVIDIA/aistore/core"
"github.com/NVIDIA/aistore/ext/dsort"
"github.com/NVIDIA/aistore/fs"
"github.com/NVIDIA/aistore/ios"
"github.com/NVIDIA/aistore/res"
Expand Down Expand Up @@ -62,12 +61,6 @@ func (g *fsprungroup) attachMpath(mpath string, label ios.Label) (addedMi *fs.Mo
}

func (g *fsprungroup) _postAdd(action string, mi *fs.Mountpath) {
// NOTE:
// - currently, dsort doesn't handle (add/enable/disable/detach mountpath) at runtime
// - consider integrating via `xreg.LimitedCoexistence`
// - review all xact.IsMountpath(kind) == true
dsort.Managers.AbortAll(fmt.Errorf("%q %s", action, mi))

fspathsConfigAddDel(mi.Path, true /*add*/)
go func() {
if cmn.GCO.Get().Resilver.Enabled {
Expand Down Expand Up @@ -134,10 +127,6 @@ func (g *fsprungroup) doDD(action string, flags uint64, mpath string, dontResilv
if err != nil || rmi == nil {
return nil, err
}

// NOTE: above
dsort.Managers.AbortAll(fmt.Errorf("%q %s", action, rmi))

if numAvail == 0 {
nlog.Errorf("%s: lost (via %q) the last available mountpath %q", g.t.si, action, rmi)
g.postDD(rmi, action, nil /*xaction*/, nil /*error*/) // go ahead to disable/detach
Expand Down
4 changes: 2 additions & 2 deletions ais/prxnotif.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,9 @@ func (n *notifs) housekeep(int64) time.Duration {
now := time.Now().UnixNano()
n.fin.mtx.Lock()
for _, nl := range n.fin.m {
timeout := hk.DelOldIval
timeout := hk.OldAgeNotif
if nl.Kind() == apc.ActList {
timeout = hk.OldAgeLsoNotif
timeout = hk.OldAgeNotifLso
}
if time.Duration(now-nl.EndTime()) > timeout {
n.fin.del(nl, true /*locked*/)
Expand Down
4 changes: 2 additions & 2 deletions ais/tgtec.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ func (t *target) httpecpost(w http.ResponseWriter, r *http.Request) {
action := apiItems[0]
switch action {
case apc.ActEcOpen:
ec.ECM.OpenStreams()
ec.ECM.OpenStreams(false /*with refc*/)
case apc.ActEcClose:
ec.ECM.CloseStreams()
ec.ECM.CloseStreams(false /*with refc*/)
default:
t.writeErr(w, r, errActEc(action))
}
Expand Down
9 changes: 4 additions & 5 deletions ais/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

// GC
const (
gcTxnsInterval = time.Hour
gcTxnsNumKeep = 16
gcTxnsTimeotMult = 10

Expand Down Expand Up @@ -157,7 +156,7 @@ func (txns *transactions) init(t *target) {
txns.t = t
txns.m = make(map[string]txn, 8)
txns.rendezvous.m = make(map[string]rndzvs, 8)
hk.Reg("txn"+hk.NameSuffix, txns.housekeep, gcTxnsInterval)
hk.Reg("txn"+hk.NameSuffix, txns.housekeep, hk.DelOldIval)
}

func (txns *transactions) begin(txn txn, nlps ...core.NLP) (err error) {
Expand Down Expand Up @@ -339,15 +338,15 @@ func (txns *transactions) housekeep(int64) (d time.Duration) {
orphans []txn
config = cmn.GCO.Get()
)
d = gcTxnsInterval
d = hk.DelOldIval
txns.mtx.Lock()
l := len(txns.m)
if l == 0 {
txns.mtx.Unlock()
return
}
if l > max(gcTxnsNumKeep*4, 16) {
d = gcTxnsInterval / 10
if l > max(gcTxnsNumKeep<<2, 32) {
d >>= 2
}
now := time.Now()
for _, txn := range txns.m {
Expand Down
16 changes: 11 additions & 5 deletions ec/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func initManager() error {
}

if ECM.bmd.IsECUsed() { // TODO -- FIXME: remove
ECM.OpenStreams()
ECM.OpenStreams(false)
}
return nil
}
Expand Down Expand Up @@ -92,7 +92,10 @@ func cbReq(hdr *transport.ObjHdr, _ io.ReadCloser, _ any, err error) {
}
}

func (mgr *Manager) OpenStreams() {
func (mgr *Manager) OpenStreams(withRefc bool) {
if withRefc {
mgr._refc.Inc()
}
if !mgr.bundleEnabled.CAS(false, true) {
return
}
Expand All @@ -119,7 +122,10 @@ func (mgr *Manager) OpenStreams() {
mgr.respBundle.Store(bundle.New(client, respSbArgs))
}

func (mgr *Manager) CloseStreams() {
func (mgr *Manager) CloseStreams(withRefc bool) {
if withRefc {
mgr._refc.Dec()
}
if !mgr.bundleEnabled.CAS(true, false) {
return
}
Expand Down Expand Up @@ -320,9 +326,9 @@ func (mgr *Manager) BMDChanged() error {

// TODO -- FIXME: remove
if newBMD.IsECUsed() && !oldBMD.IsECUsed() {
mgr.OpenStreams()
mgr.OpenStreams(false)
} else if !newBMD.IsECUsed() && oldBMD.IsECUsed() {
mgr.CloseStreams()
mgr.CloseStreams(false)
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion ext/dload/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type (
// Certification check is disabled for now and does not depend on cluster settings.
clientH *http.Client
clientTLS *http.Client

once sync.Once // newInfoStore upon the first execution
}
)

Expand All @@ -68,7 +70,6 @@ func Init(tstats stats.Tracker, db kvdb.Driver, clientConf *cmn.ClientConf) {
{
g.tstats = tstats
g.db = db
g.store = newInfoStore(db)
}
xreg.RegNonBckXact(&factory{})
}
Expand Down
5 changes: 5 additions & 0 deletions ext/dload/xact.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ func (*factory) New(args xreg.Args, _ *meta.Bck) xreg.Renewable {
func (p *factory) Start() error {
xdl := newXact(p)
p.xctn = xdl

g.once.Do(func() {
g.store = newInfoStore(g.db)
})

go xdl.Run(nil)
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions ext/dsort/dsort.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ func (m *Manager) participateInRecordDistribution(targetOrder meta.Nodes) (curre
)
group.Go(func() error {
var (
buf, slab = g.mm.AllocSize(serializationBufSize)
buf, slab = g.mem.AllocSize(serializationBufSize)
msgpw = msgp.NewWriterBuf(w, buf)
)
defer slab.Free(buf)
Expand Down Expand Up @@ -861,7 +861,7 @@ func (m *Manager) _dist(si *meta.Snode, s []*shard.Shard, order map[string]*shar
)
group.Go(func() error {
var (
buf, slab = g.mm.AllocSize(serializationBufSize)
buf, slab = g.mem.AllocSize(serializationBufSize)
msgpw = msgp.NewWriterBuf(w, buf)
md = &CreationPhaseMetadata{Shards: s, SendOrder: order}
)
Expand Down
4 changes: 2 additions & 2 deletions ext/dsort/dsort_general.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (ds *dsorterGeneral) loadLocal(w io.Writer, obj *shard.RecordObj) (written
)

if storeType != shard.SGLStoreType { // SGL does not need buffer as it is buffer itself
buf, slab = g.mm.AllocSize(obj.Size)
buf, slab = g.mem.AllocSize(obj.Size)
}

defer func() {
Expand Down Expand Up @@ -556,7 +556,7 @@ func (ds *dsorterGeneral) recvResp(hdr *transport.ObjHdr, object io.Reader, err
return nil
}

buf, slab := g.mm.AllocSize(hdr.ObjAttrs.Size)
buf, slab := g.mem.AllocSize(hdr.ObjAttrs.Size)
writer.n, writer.err = io.CopyBuffer(writer.w, object, buf)
writer.wg.Done()
slab.Free(buf)
Expand Down
2 changes: 1 addition & 1 deletion ext/dsort/dsort_mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (c *rwConnector) connectReader(key string, r io.Reader, size int64) (err er
c.mu.Unlock()

if !all {
rw.sgl = g.mm.NewSGL(size)
rw.sgl = g.mem.NewSGL(size)
_, err = io.Copy(rw.sgl, r)
rw.wgr.Done()
return
Expand Down
30 changes: 15 additions & 15 deletions ext/dsort/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,9 +405,9 @@ func TargetHandler(w http.ResponseWriter, r *http.Request) {
case apc.Start:
tstartHandler(w, r)
case apc.Records:
Managers.recordsHandler(w, r)
g.mg.recordsHandler(w, r)
case apc.Shards:
Managers.shardsHandler(w, r)
g.mg.shardsHandler(w, r)
case apc.Abort:
tabortHandler(w, r)
case apc.Remove:
Expand Down Expand Up @@ -455,7 +455,7 @@ func tinitHandler(w http.ResponseWriter, r *http.Request) {
}

managerUUID := apiItems[0]
m, err := Managers.Add(managerUUID) // NOTE: returns manager locked iff err == nil
m, err := g.mg.Add(managerUUID) // NOTE: returns manager locked iff err == nil
if err != nil {
cmn.WriteErr(w, r, err)
return
Expand Down Expand Up @@ -491,7 +491,7 @@ func tstartHandler(w http.ResponseWriter, r *http.Request) {
}

managerUUID := apiItems[0]
m, exists := Managers.Get(managerUUID, false /*incl. archived*/)
m, exists := g.mg.Get(managerUUID, false /*incl. archived*/)
if !exists {
s := fmt.Sprintf("invalid request: job %q does not exist", managerUUID)
cmn.WriteErrMsg(w, r, s, http.StatusNotFound)
Expand Down Expand Up @@ -536,7 +536,7 @@ func (m *Manager) errHandler(err error) {
// shardsHandler is the handler for the HTTP endpoint /v1/sort/shards.
// A valid POST to this endpoint results in a new shard being created locally based on the contents
// of the incoming request body. The shard is then sent to the correct target in the cluster as per HRW.
func (managers *ManagerGroup) shardsHandler(w http.ResponseWriter, r *http.Request) {
func (mg *managerGroup) shardsHandler(w http.ResponseWriter, r *http.Request) {
if !checkHTTPMethod(w, r, http.MethodPost) {
return
}
Expand All @@ -545,7 +545,7 @@ func (managers *ManagerGroup) shardsHandler(w http.ResponseWriter, r *http.Reque
return
}
managerUUID := apiItems[0]
m, exists := managers.Get(managerUUID, false /*incl. archived*/)
m, exists := mg.Get(managerUUID, false /*incl. archived*/)
if !exists {
s := fmt.Sprintf("invalid request: job %q does not exist", managerUUID)
cmn.WriteErrMsg(w, r, s, http.StatusNotFound)
Expand All @@ -562,7 +562,7 @@ func (managers *ManagerGroup) shardsHandler(w http.ResponseWriter, r *http.Reque
}

var (
buf, slab = g.mm.AllocSize(serializationBufSize)
buf, slab = g.mem.AllocSize(serializationBufSize)
tmpMetadata = &CreationPhaseMetadata{}
)
defer slab.Free(buf)
Expand All @@ -585,7 +585,7 @@ func (managers *ManagerGroup) shardsHandler(w http.ResponseWriter, r *http.Reque
// recordsHandler is the handler /v1/sort/records.
// A valid POST to this endpoint updates this target's dsortManager.Records with the
// []Records from the request body, along with some related state variables.
func (managers *ManagerGroup) recordsHandler(w http.ResponseWriter, r *http.Request) {
func (mg *managerGroup) recordsHandler(w http.ResponseWriter, r *http.Request) {
if !checkHTTPMethod(w, r, http.MethodPost) {
return
}
Expand All @@ -594,7 +594,7 @@ func (managers *ManagerGroup) recordsHandler(w http.ResponseWriter, r *http.Requ
return
}
managerUUID := apiItems[0]
m, exists := managers.Get(managerUUID, false /*incl. archived*/)
m, exists := mg.Get(managerUUID, false /*incl. archived*/)
if !exists {
s := fmt.Sprintf("invalid request: job %q does not exist", managerUUID)
cmn.WriteErrMsg(w, r, s, http.StatusNotFound)
Expand Down Expand Up @@ -633,7 +633,7 @@ func (managers *ManagerGroup) recordsHandler(w http.ResponseWriter, r *http.Requ
}

var (
buf, slab = g.mm.AllocSize(serializationBufSize)
buf, slab = g.mem.AllocSize(serializationBufSize)
records = shard.NewRecords(int(d))
)
defer slab.Free(buf)
Expand Down Expand Up @@ -669,7 +669,7 @@ func tabortHandler(w http.ResponseWriter, r *http.Request) {
}

managerUUID := apiItems[0]
m, exists := Managers.Get(managerUUID, true /*incl. archived*/)
m, exists := g.mg.Get(managerUUID, true /*incl. archived*/)
if !exists {
s := fmt.Sprintf("%s: [dsort] %s does not exist", core.T, managerUUID)
cmn.WriteErrMsg(w, r, s, http.StatusNotFound)
Expand All @@ -695,7 +695,7 @@ func tremoveHandler(w http.ResponseWriter, r *http.Request) {
}

managerUUID := apiItems[0]
if err := Managers.Remove(managerUUID); err != nil {
if err := g.mg.Remove(managerUUID); err != nil {
cmn.WriteErr(w, r, err)
return
}
Expand All @@ -719,7 +719,7 @@ func tlistHandler(w http.ResponseWriter, r *http.Request) {
}
}

w.Write(cos.MustMarshal(Managers.List(regex, onlyActive)))
w.Write(cos.MustMarshal(g.mg.List(regex, onlyActive)))
}

// /v1/sort/metrics.
Expand All @@ -734,7 +734,7 @@ func tmetricsHandler(w http.ResponseWriter, r *http.Request) {
}

managerUUID := apiItems[0]
m, exists := Managers.Get(managerUUID, true /*incl. archived*/)
m, exists := g.mg.Get(managerUUID, true /*incl. archived*/)
if !exists {
s := fmt.Sprintf("%s: [dsort] %s does not exist", core.T, managerUUID)
cmn.WriteErrMsg(w, r, s, http.StatusNotFound)
Expand Down Expand Up @@ -763,7 +763,7 @@ func tfiniHandler(w http.ResponseWriter, r *http.Request) {
}

managerUUID, tid := apiItems[0], apiItems[1]
m, exists := Managers.Get(managerUUID, false /*incl. archived*/)
m, exists := g.mg.Get(managerUUID, false /*incl. archived*/)
if !exists {
s := fmt.Sprintf("invalid request: job %q does not exist", managerUUID)
cmn.WriteErrMsg(w, r, s, http.StatusNotFound)
Expand Down
14 changes: 9 additions & 5 deletions ext/dsort/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ const (
type (
global struct {
tstats stats.Tracker
mm *memsys.MMSA
mem *memsys.MMSA

// internal
mg *managerGroup
once sync.Once // reg housekeep upon the first usage
}
buildingShardInfo struct {
shardName string
Expand All @@ -80,7 +84,7 @@ type (
Metrics *Metrics `json:"metrics"`
Pars *parsedReqSpec `json:"pars"`

mg *ManagerGroup // parent
mg *managerGroup // parent
mu sync.Mutex
smap *meta.Smap
recm *shard.RecordManager
Expand Down Expand Up @@ -134,14 +138,14 @@ func Pinit(si core.Node, config *cmn.Config) {
}

func Tinit(tstats stats.Tracker, db kvdb.Driver, config *cmn.Config) {
Managers = NewManagerGroup(db, false)
g.mg = newManagerGroup(db)

xreg.RegBckXact(&factory{})

debug.Assert(g.mm == nil) // only once
debug.Assert(g.mem == nil) // only once
{
g.tstats = tstats
g.mm = core.T.PageMM()
g.mem = core.T.PageMM()
}
fs.CSM.Reg(ct.DsortFileType, &ct.DsortFile{})
fs.CSM.Reg(ct.DsortWorkfileType, &ct.DsortFile{})
Expand Down
Loading

0 comments on commit a91636b

Please sign in to comment.