diff --git a/xact/xs/archive.go b/xact/xs/archive.go index 1483b03f6c..ad26183a46 100644 --- a/xact/xs/archive.go +++ b/xact/xs/archive.go @@ -40,6 +40,7 @@ type ( streamingF } archwi struct { // archival work item; implements lrwi + j *jogger writer archive.Writer r *XactArch msg *cmn.ArchiveBckMsg @@ -55,10 +56,23 @@ type ( // finishing refc atomic.Int32 } + archtask struct { + wi *archwi + lrit *lriterator + } + jogger struct { + mpath *fs.Mountpath + workCh chan *archtask + stopCh cos.StopCh + } XactArch struct { streamingX - workCh chan *cmn.ArchiveBckMsg bckTo *meta.Bck + joggers struct { + wg sync.WaitGroup + m map[string]*jogger + sync.RWMutex + } pending struct { m map[string]*archwi sync.RWMutex @@ -98,9 +112,10 @@ func (p *archFactory) Start() (err error) { // // new x-archive // - workCh := make(chan *cmn.ArchiveBckMsg, maxNumInParallel*2) - r := &XactArch{streamingX: streamingX{p: &p.streamingF, config: cmn.GCO.Get()}, workCh: workCh} - r.pending.m = make(map[string]*archwi, maxNumInParallel*2) + r := &XactArch{streamingX: streamingX{p: &p.streamingF, config: cmn.GCO.Get()}} + r.pending.m = make(map[string]*archwi, maxNumInParallel) + avail := fs.GetAvail() + r.joggers.m = make(map[string]*jogger, len(avail)) p.xctn = r r.DemandBase.Init(p.UUID() /*== p.Args.UUID above*/, p.kind, p.Bck /*from*/, xact.IdleDefault) @@ -144,6 +159,25 @@ func (r *XactArch) Begin(msg *cmn.ArchiveBckMsg, archlom *core.LOM) (err error) return err } + // bind a new/existing jogger to this archwi based on archlom's mountpath + var exists bool + mpath := archlom.Mountpath() + r.joggers.Lock() + if wi.j, exists = r.joggers.m[mpath.Path]; !exists { + r.joggers.m[mpath.Path] = &jogger{ + mpath: mpath, + workCh: make(chan *archtask, maxNumInParallel*2), + } + wi.j = r.joggers.m[mpath.Path] + wi.j.stopCh.Init() + r.joggers.wg.Add(1) + go func() { + wi.j.run() + r.joggers.wg.Done() + }() + } + r.joggers.Unlock() + // fcreate at BEGIN time if core.T.SID() == wi.tsi.ID() { var ( @@ -156,6 +190,8 @@ func (r *XactArch) Begin(msg *cmn.ArchiveBckMsg, archlom *core.LOM) (err error) if !wi.archlom.IsChunked() { s = " append" lmfh, err = wi.beginAppend() + } else { + wi.wfh, err = wi.archlom.CreateWork(wi.fqn) } } else { wi.wfh, err = wi.archlom.CreateWork(wi.fqn) @@ -168,7 +204,7 @@ func (r *XactArch) Begin(msg *cmn.ArchiveBckMsg, archlom *core.LOM) (err error) } // construct format-specific writer; serialize for multi-target conc. writing - opts := archive.Opts{Serialize: nat > 1, TarFormat: wi.tarFormat} + opts := archive.Opts{Serialize: true, TarFormat: wi.tarFormat} wi.writer = archive.NewWriter(msg.Mime, wi.wfh, &wi.cksum, &opts) // append case (above) @@ -198,60 +234,43 @@ func (r *XactArch) Begin(msg *cmn.ArchiveBckMsg, archlom *core.LOM) (err error) func (r *XactArch) Do(msg *cmn.ArchiveBckMsg) { r.IncPending() - r.workCh <- msg + r.pending.RLock() + wi, ok := r.pending.m[msg.TxnUUID] + r.pending.RUnlock() + if !ok { + debug.Assert(r.ErrCnt() > 0) // see cleanup + r.Abort(r.Err()) + r.DecPending() + r.cleanup() + } + var lrit = &lriterator{} + err := lrit.init(r, &msg.ListRange, r.Bck(), true) + if err != nil { + r.Abort(err) + r.DecPending() + r.cleanup() + } + + wi.j.workCh <- &archtask{wi, lrit} + if r.Err() != nil { + wi.cleanup() + r.Abort(r.Err()) + r.cleanup() + } } func (r *XactArch) Run(wg *sync.WaitGroup) { - var err error nlog.Infoln(r.Name()) wg.Done() - for { - select { - case msg := <-r.workCh: - r.pending.RLock() - wi, ok := r.pending.m[msg.TxnUUID] - r.pending.RUnlock() - if !ok { - debug.Assert(r.ErrCnt() > 0) // see cleanup - goto fin - } - var ( - smap = core.T.Sowner().Get() - lrit = &lriterator{} - ) - err = lrit.init(r, &msg.ListRange, r.Bck(), true /*TODO: remove blocking*/) - if err != nil { - r.Abort(err) - goto fin - } - err = lrit.run(wi, smap) - if err != nil { - r.AddErr(err) - } - lrit.wait() - if r.Err() != nil { - wi.cleanup() - goto fin - } - if core.T.SID() == wi.tsi.ID() { - go r.finalize(wi) // async finalize this shard - } else { - r.sendTerm(wi.msg.TxnUUID, wi.tsi, nil) - r.pending.Lock() - delete(r.pending.m, msg.TxnUUID) - r.wiCnt.Dec() - r.pending.Unlock() - r.DecPending() - - core.FreeLOM(wi.archlom) - } - case <-r.IdleTimer(): - goto fin - case <-r.ChanAbort(): - goto fin - } + select { + case <-r.IdleTimer(): + r.cleanup() + case <-r.ChanAbort(): + r.cleanup() } -fin: +} + +func (r *XactArch) cleanup() { r.streamingX.fin(true /*unreg Rx*/) if r.Err() == nil { return @@ -264,6 +283,14 @@ fin: } clear(r.pending.m) r.pending.Unlock() + + r.joggers.Lock() + for _, jogger := range r.joggers.m { + jogger.stopCh.Close() + } + clear(r.joggers.m) + r.joggers.Unlock() + r.joggers.wg.Wait() } func (r *XactArch) doSend(lom *core.LOM, wi *archwi, fh cos.ReadOpenCloser) { @@ -337,7 +364,7 @@ func (r *XactArch) finalize(wi *archwi) { r.wiCnt.Dec() r.pending.Unlock() - ecode, err := r.fini(wi) + ecode, err := r._fini(wi) r.DecPending() if cmn.Rom.FastV(5, cos.SmoduleXs) { var s string @@ -355,7 +382,7 @@ func (r *XactArch) finalize(wi *archwi) { r.AddErr(err, 5, cos.SmoduleXs) } -func (r *XactArch) fini(wi *archwi) (ecode int, err error) { +func (r *XactArch) _fini(wi *archwi) (ecode int, err error) { wi.writer.Fini() if r.IsAborted() { @@ -428,6 +455,40 @@ func (r *XactArch) Snap() (snap *core.Snap) { return } +//////////// +// jogger // +//////////// + +func (j *jogger) run() { + nlog.Infoln("jogger started in mount path", j.mpath) + for { + select { + case archtask := <-j.workCh: + lrit, wi := archtask.lrit, archtask.wi + smap := core.T.Sowner().Get() + err := lrit.run(wi, smap) + if err != nil { + wi.r.AddErr(err) + } + lrit.wait() + if core.T.SID() == wi.tsi.ID() { + go wi.r.finalize(wi) // async finalize this shard + } else { + wi.r.sendTerm(wi.msg.TxnUUID, wi.tsi, nil) + wi.r.pending.Lock() + delete(wi.r.pending.m, wi.msg.TxnUUID) + wi.r.wiCnt.Dec() + wi.r.pending.Unlock() + wi.r.DecPending() + + core.FreeLOM(wi.archlom) + } + case <-j.stopCh.Listen(): + return + } + } +} + //////////// // archwi // ////////////