Skip to content

Commit

Permalink
follow-up
Browse files Browse the repository at this point in the history
* amend log: `ec` & `transport/bundle` packages

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Sep 2, 2024
1 parent 049a3a3 commit 7530930
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 56 deletions.
6 changes: 3 additions & 3 deletions ec/getjogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (*getJogger) freeCtx(ctx *restoreCtx) {
}

func (c *getJogger) run() {
nlog.Infof("started EC for mountpath: %s, bucket %s", c.mpath, c.parent.bck)
nlog.Infoln("start [", c.parent.bck.Cname(""), c.mpath, "]")

for {
select {
Expand All @@ -106,7 +106,7 @@ func (c *getJogger) run() {
}

func (c *getJogger) stop() {
nlog.Infof("stopping EC for mountpath: %s, bucket: %s", c.mpath, c.parent.bck)
nlog.Infoln("stop [", c.parent.bck.Cname(""), c.mpath, "]")
c.stopCh.Close()
}

Expand Down Expand Up @@ -792,7 +792,7 @@ func (c *getJogger) restoreEncoded(ctx *restoreCtx) error {
if err := c.uploadRestoredSlices(ctx, restored); err != nil {
nlog.Errorf("failed to upload restored slices of %s: %v", ctx.lom, err)
} else if cmn.Rom.FastV(4, cos.SmoduleEC) {
nlog.Infof("slices %s restored successfully", ctx.lom)
nlog.Infof("restored %s slices", ctx.lom)
}

c.freeDownloaded(ctx)
Expand Down
79 changes: 40 additions & 39 deletions ec/putjogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,26 +79,24 @@ func (ctx *encodeCtx) freeReplica() {
// putJogger //
///////////////

func (*putJogger) newCtx(lom *core.LOM, meta *Metadata) (ctx *encodeCtx, err error) {
ctx = allocCtx()
ctx.lom = lom
ctx.dataSlices = lom.Bprops().EC.DataSlices
ctx.paritySlices = lom.Bprops().EC.ParitySlices
ctx.meta = meta

totalCnt := ctx.paritySlices + ctx.dataSlices
ctx.sliceSize = SliceSize(ctx.lom.Lsize(), ctx.dataSlices)
ctx.slices = make([]*slice, totalCnt)
ctx.padSize = ctx.sliceSize*int64(ctx.dataSlices) - ctx.lom.Lsize()
debug.Assert(ctx.padSize >= 0)

ctx.fh, err = cos.NewFileHandle(lom.FQN)
return ctx, err
}
func (c *putJogger) run(wg *sync.WaitGroup) {
nlog.Infoln("start [", c.parent.bck.Cname(""), c.mpath, "]")

func (*putJogger) freeCtx(ctx *encodeCtx) {
*ctx = emptyCtx
encCtxPool.Put(ctx)
defer wg.Done()
c.buffer, c.slab = g.pmm.Alloc()
for {
select {
case req := <-c.putCh:
c.processRequest(req)
freeReq(req)
case req := <-c.xactCh:
c.processRequest(req)
freeReq(req)
case <-c.stopCh.Listen():
c.freeResources()
return
}
}
}

func (c *putJogger) freeResources() {
Expand Down Expand Up @@ -139,27 +137,8 @@ func (c *putJogger) processRequest(req *request) {
}
}

func (c *putJogger) run(wg *sync.WaitGroup) {
nlog.Infof("Started EC for mountpath: %s, bucket %s", c.mpath, c.parent.bck)
defer wg.Done()
c.buffer, c.slab = g.pmm.Alloc()
for {
select {
case req := <-c.putCh:
c.processRequest(req)
freeReq(req)
case req := <-c.xactCh:
c.processRequest(req)
freeReq(req)
case <-c.stopCh.Listen():
c.freeResources()
return
}
}
}

func (c *putJogger) stop() {
nlog.Infof("Stopping EC for mountpath: %s, bucket %s", c.mpath, c.parent.bck)
nlog.Infoln("stop [", c.parent.bck.Cname(""), c.mpath, "]")
c.stopCh.Close()
}

Expand Down Expand Up @@ -288,6 +267,28 @@ func (c *putJogger) encode(req *request, lom *core.LOM) error {
return nil
}

func (*putJogger) newCtx(lom *core.LOM, meta *Metadata) (ctx *encodeCtx, err error) {
ctx = allocCtx()
ctx.lom = lom
ctx.dataSlices = lom.Bprops().EC.DataSlices
ctx.paritySlices = lom.Bprops().EC.ParitySlices
ctx.meta = meta

totalCnt := ctx.paritySlices + ctx.dataSlices
ctx.sliceSize = SliceSize(ctx.lom.Lsize(), ctx.dataSlices)
ctx.slices = make([]*slice, totalCnt)
ctx.padSize = ctx.sliceSize*int64(ctx.dataSlices) - ctx.lom.Lsize()
debug.Assert(ctx.padSize >= 0)

ctx.fh, err = cos.NewFileHandle(lom.FQN)
return ctx, err
}

func (*putJogger) freeCtx(ctx *encodeCtx) {
*ctx = emptyCtx
encCtxPool.Put(ctx)
}

func (c *putJogger) ctSendCallback(hdr *transport.ObjHdr, _ io.ReadCloser, _ any, err error) {
g.smm.Free(hdr.Opaque)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion ec/xaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (r *xactECBase) sendByDaemonID(daemonIDs []string, o *transport.Obj, reader
for _, id := range daemonIDs {
si, ok := smap.Tmap[id]
if !ok {
nlog.Errorf("t[%s] not found", id)
nlog.Errorln(meta.Tname(id), "not found in", smap.StringEx())
continue
}
nodes = append(nodes, si)
Expand Down
14 changes: 9 additions & 5 deletions transport/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,7 @@ func (s *streamBase) _lid(sid, dstID string, extra *Extra) {
sb.WriteByte('[')
sb.WriteString(strconv.FormatInt(s.sessID, 10))

if extra.Compressed() {
sb.WriteByte('[')
sb.WriteString(cos.ToSizeIEC(int64(extra.Config.Transport.LZ4BlockMaxSize), 0))
sb.WriteByte(']')
}
extra.Lid(&sb)

sb.WriteString("]=>")
sb.WriteString(dstID)
Expand Down Expand Up @@ -334,6 +330,14 @@ func (extra *Extra) Compressed() bool {
return extra.Compression != "" && extra.Compression != apc.CompressNever
}

func (extra *Extra) Lid(sb *strings.Builder) {
if extra.Compressed() {
sb.WriteByte('[')
sb.WriteString(cos.ToSizeIEC(int64(extra.Config.Transport.LZ4BlockMaxSize), 0))
sb.WriteByte(']')
}
}

//
// misc
//
Expand Down
37 changes: 29 additions & 8 deletions transport/bundle/stream_bundle.go → transport/bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package bundle

import (
"fmt"
"strconv"
"strings"
"sync"
ratomic "sync/atomic"

Expand Down Expand Up @@ -79,7 +81,7 @@ func (sb *Streams) Trname() string { return sb.trname }

func New(cl transport.Client, args Args) (sb *Streams) {
if args.Net == "" {
args.Net = cmn.NetIntraData
args.Net = cmn.NetIntraData // intra-cluster default
}
sb = &Streams{
smap: &meta.Smap{}, // empty on purpose (see Resync)
Expand All @@ -91,26 +93,22 @@ func New(cl transport.Client, args Args) (sb *Streams) {
multiplier: args.Multiplier,
manualResync: args.ManualResync,
}
debug.Assert(args.Extra != nil && args.Extra.Config != nil)
sb.extra = *args.Extra
if sb.multiplier == 0 {
sb.multiplier = 1
}
if sb.extra.Config == nil {
sb.extra.Config = cmn.GCO.Get()
}
if !sb.extra.Compressed() {
sb.lid = fmt.Sprintf("sb[%s-%s-%s]", core.T.SID(), sb.network, sb.trname)
} else {
sb.lid = fmt.Sprintf("sb[%s-%s-%s[%s]]", core.T.SID(), sb.network, sb.trname,
cos.ToSizeIEC(int64(sb.extra.Config.Transport.LZ4BlockMaxSize), 0))
}

// update streams when Smap changes
sb.smaplock.Lock()
sb.Resync()
sb.smaplock.Unlock()

sb._lid()
nlog.Infoln("open", sb.lid)

// register this stream-bundle as Smap listener
if !sb.manualResync {
listeners := core.T.Sowner().Listeners()
Expand All @@ -119,12 +117,35 @@ func New(cl transport.Client, args Args) (sb *Streams) {
return
}

func (sb *Streams) _lid() {
var s strings.Builder

s.WriteString("sb-[")
s.WriteString(core.T.SID())
if sb.network != cmn.NetIntraData {
s.WriteByte('-')
s.WriteString(sb.network)
}
s.WriteString("-v")
s.WriteString(strconv.FormatInt(sb.smap.Version, 10))
s.WriteByte('-')
s.WriteString(sb.trname)

sb.extra.Lid(&s)

s.WriteByte(']')

sb.lid = s.String() // approx. "sb[%s-%s-%s...]"
}

// Close closes all contained streams and unregisters the bundle from Smap listeners;
// graceful=true blocks until all pending objects get completed (for "completion", see transport/README.md)
func (sb *Streams) Close(gracefully bool) {
if gracefully {
nlog.Infoln("close", sb.lid)
sb.apply(closeFin)
} else {
nlog.Infoln("stop", sb.lid)
sb.apply(closeStop)
}
if !sb.manualResync {
Expand Down

0 comments on commit 7530930

Please sign in to comment.