Skip to content

Commit

Permalink
feat: implemented jogger to execute archive xact on each mountpath
Browse files Browse the repository at this point in the history
Signed-off-by: Tony Chen <[email protected]>
  • Loading branch information
Nahemah1022 committed Jul 31, 2024
1 parent c06c0a5 commit b7d0765
Showing 1 changed file with 116 additions and 55 deletions.
171 changes: 116 additions & 55 deletions xact/xs/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type (
streamingF
}
archwi struct { // archival work item; implements lrwi
j *jogger
writer archive.Writer
r *XactArch
msg *cmn.ArchiveBckMsg
Expand All @@ -55,10 +56,23 @@ type (
// finishing
refc atomic.Int32
}
archtask struct {
wi *archwi
lrit *lriterator
}
jogger struct {
mpath *fs.Mountpath
workCh chan *archtask
stopCh cos.StopCh
}
XactArch struct {
streamingX
workCh chan *cmn.ArchiveBckMsg
bckTo *meta.Bck
joggers struct {
wg sync.WaitGroup
m map[string]*jogger
sync.RWMutex
}
pending struct {
m map[string]*archwi
sync.RWMutex
Expand Down Expand Up @@ -98,9 +112,10 @@ func (p *archFactory) Start() (err error) {
//
// new x-archive
//
workCh := make(chan *cmn.ArchiveBckMsg, maxNumInParallel*2)
r := &XactArch{streamingX: streamingX{p: &p.streamingF, config: cmn.GCO.Get()}, workCh: workCh}
r.pending.m = make(map[string]*archwi, maxNumInParallel*2)
r := &XactArch{streamingX: streamingX{p: &p.streamingF, config: cmn.GCO.Get()}}
r.pending.m = make(map[string]*archwi, maxNumInParallel)
avail := fs.GetAvail()
r.joggers.m = make(map[string]*jogger, len(avail))
p.xctn = r
r.DemandBase.Init(p.UUID() /*== p.Args.UUID above*/, p.kind, p.Bck /*from*/, xact.IdleDefault)

Expand Down Expand Up @@ -144,6 +159,25 @@ func (r *XactArch) Begin(msg *cmn.ArchiveBckMsg, archlom *core.LOM) (err error)
return err
}

// bind a new/existing jogger to this archwi based on archlom's mountpath
var exists bool
mpath := archlom.Mountpath()
r.joggers.Lock()
if wi.j, exists = r.joggers.m[mpath.Path]; !exists {
r.joggers.m[mpath.Path] = &jogger{
mpath: mpath,
workCh: make(chan *archtask, maxNumInParallel*2),
}
wi.j = r.joggers.m[mpath.Path]
wi.j.stopCh.Init()
r.joggers.wg.Add(1)
go func() {
wi.j.run()
r.joggers.wg.Done()
}()
}
r.joggers.Unlock()

// fcreate at BEGIN time
if core.T.SID() == wi.tsi.ID() {
var (
Expand All @@ -156,6 +190,8 @@ func (r *XactArch) Begin(msg *cmn.ArchiveBckMsg, archlom *core.LOM) (err error)
if !wi.archlom.IsChunked() {
s = " append"
lmfh, err = wi.beginAppend()
} else {
wi.wfh, err = wi.archlom.CreateWork(wi.fqn)
}
} else {
wi.wfh, err = wi.archlom.CreateWork(wi.fqn)
Expand All @@ -168,7 +204,7 @@ func (r *XactArch) Begin(msg *cmn.ArchiveBckMsg, archlom *core.LOM) (err error)
}

// construct format-specific writer; serialize for multi-target conc. writing
opts := archive.Opts{Serialize: nat > 1, TarFormat: wi.tarFormat}
opts := archive.Opts{Serialize: true, TarFormat: wi.tarFormat}
wi.writer = archive.NewWriter(msg.Mime, wi.wfh, &wi.cksum, &opts)

// append case (above)
Expand Down Expand Up @@ -198,60 +234,43 @@ func (r *XactArch) Begin(msg *cmn.ArchiveBckMsg, archlom *core.LOM) (err error)

func (r *XactArch) Do(msg *cmn.ArchiveBckMsg) {
r.IncPending()
r.workCh <- msg
r.pending.RLock()
wi, ok := r.pending.m[msg.TxnUUID]
r.pending.RUnlock()
if !ok {
debug.Assert(r.ErrCnt() > 0) // see cleanup
r.Abort(r.Err())
r.DecPending()
r.cleanup()
}
var lrit = &lriterator{}
err := lrit.init(r, &msg.ListRange, r.Bck(), true)
if err != nil {
r.Abort(err)
r.DecPending()
r.cleanup()
}

wi.j.workCh <- &archtask{wi, lrit}
if r.Err() != nil {
wi.cleanup()
r.Abort(r.Err())
r.cleanup()
}
}

func (r *XactArch) Run(wg *sync.WaitGroup) {
var err error
nlog.Infoln(r.Name())
wg.Done()
for {
select {
case msg := <-r.workCh:
r.pending.RLock()
wi, ok := r.pending.m[msg.TxnUUID]
r.pending.RUnlock()
if !ok {
debug.Assert(r.ErrCnt() > 0) // see cleanup
goto fin
}
var (
smap = core.T.Sowner().Get()
lrit = &lriterator{}
)
err = lrit.init(r, &msg.ListRange, r.Bck(), true /*TODO: remove blocking*/)
if err != nil {
r.Abort(err)
goto fin
}
err = lrit.run(wi, smap)
if err != nil {
r.AddErr(err)
}
lrit.wait()
if r.Err() != nil {
wi.cleanup()
goto fin
}
if core.T.SID() == wi.tsi.ID() {
go r.finalize(wi) // async finalize this shard
} else {
r.sendTerm(wi.msg.TxnUUID, wi.tsi, nil)
r.pending.Lock()
delete(r.pending.m, msg.TxnUUID)
r.wiCnt.Dec()
r.pending.Unlock()
r.DecPending()

core.FreeLOM(wi.archlom)
}
case <-r.IdleTimer():
goto fin
case <-r.ChanAbort():
goto fin
}
select {
case <-r.IdleTimer():
r.cleanup()
case <-r.ChanAbort():
r.cleanup()
}
fin:
}

func (r *XactArch) cleanup() {
r.streamingX.fin(true /*unreg Rx*/)
if r.Err() == nil {
return
Expand All @@ -264,6 +283,14 @@ fin:
}
clear(r.pending.m)
r.pending.Unlock()

r.joggers.Lock()
for _, jogger := range r.joggers.m {
jogger.stopCh.Close()
}
clear(r.joggers.m)
r.joggers.Unlock()
r.joggers.wg.Wait()
}

func (r *XactArch) doSend(lom *core.LOM, wi *archwi, fh cos.ReadOpenCloser) {
Expand Down Expand Up @@ -337,7 +364,7 @@ func (r *XactArch) finalize(wi *archwi) {
r.wiCnt.Dec()
r.pending.Unlock()

ecode, err := r.fini(wi)
ecode, err := r._fini(wi)
r.DecPending()
if cmn.Rom.FastV(5, cos.SmoduleXs) {
var s string
Expand All @@ -355,7 +382,7 @@ func (r *XactArch) finalize(wi *archwi) {
r.AddErr(err, 5, cos.SmoduleXs)
}

func (r *XactArch) fini(wi *archwi) (ecode int, err error) {
func (r *XactArch) _fini(wi *archwi) (ecode int, err error) {
wi.writer.Fini()

if r.IsAborted() {
Expand Down Expand Up @@ -428,6 +455,40 @@ func (r *XactArch) Snap() (snap *core.Snap) {
return
}

////////////
// jogger //
////////////

func (j *jogger) run() {
nlog.Infoln("jogger started in mount path", j.mpath)
for {
select {
case archtask := <-j.workCh:
lrit, wi := archtask.lrit, archtask.wi
smap := core.T.Sowner().Get()
err := lrit.run(wi, smap)
if err != nil {
wi.r.AddErr(err)
}
lrit.wait()
if core.T.SID() == wi.tsi.ID() {
go wi.r.finalize(wi) // async finalize this shard
} else {
wi.r.sendTerm(wi.msg.TxnUUID, wi.tsi, nil)
wi.r.pending.Lock()
delete(wi.r.pending.m, wi.msg.TxnUUID)
wi.r.wiCnt.Dec()
wi.r.pending.Unlock()
wi.r.DecPending()

core.FreeLOM(wi.archlom)
}
case <-j.stopCh.Listen():
return
}
}
}

////////////
// archwi //
////////////
Expand Down

0 comments on commit b7d0765

Please sign in to comment.