Skip to content

Commit

Permalink
transport header & burst size can now be set at runtime
Browse files Browse the repository at this point in the history
* intra-cluster transport: the two knobs were readonly
  - not anymore
* with refactoring

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Aug 24, 2024
1 parent 6c09796 commit 0a54545
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 52 deletions.
2 changes: 1 addition & 1 deletion ais/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (t *target) init(config *cmn.Config) {

t.fsprg.init(t, newVol) // subgroup of the daemon.rg rungroup

sc := transport.Init(ts, config) // init transport sub-system; new stream collector
sc := transport.Init(ts) // init transport sub-system; new stream collector
daemon.rg.add(sc)

t.fshc = health.NewFSHC(t)
Expand Down
33 changes: 21 additions & 12 deletions cmn/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,8 +575,8 @@ type (
LZ4FrameChecksum bool `json:"lz4_frame_checksum"`
}
TransportConfToSet struct {
MaxHeaderSize *int `json:"max_header,omitempty" list:"readonly"`
Burst *int `json:"burst_buffer,omitempty" list:"readonly"`
MaxHeaderSize *int `json:"max_header,omitempty"`
Burst *int `json:"burst_buffer,omitempty"`
IdleTeardown *cos.Duration `json:"idle_teardown,omitempty"`
QuiesceTime *cos.Duration `json:"quiescent,omitempty"`
LZ4BlockMaxSize *cos.SizeIEC `json:"lz4_block,omitempty"`
Expand Down Expand Up @@ -1597,27 +1597,36 @@ func (c *MemsysConf) Validate() (err error) {
// TransportConf //
///////////////////

const (
DfltTransportHeader = 4 * cos.KiB // memsys.PageSize
MaxTransportHeader = 128 * cos.KiB // memsys.MaxPageSlabSize

DfltTransportBurst = 256
MaxTransportBurst = 4096
)

// NOTE: uncompressed block sizes - the enum currently supported by the github.com/pierrec/lz4
func (c *TransportConf) Validate() (err error) {
if c.LZ4BlockMaxSize != 64*cos.KiB && c.LZ4BlockMaxSize != 256*cos.KiB &&
c.LZ4BlockMaxSize != cos.MiB && c.LZ4BlockMaxSize != 4*cos.MiB {
return fmt.Errorf("invalid transport.block_size %s (expected one of: [64K, 256K, 1MB, 4MB])",
return fmt.Errorf("invalid transport.block_size %s, expecting one of: [64K, 256K, 1MB, 4MB]",
c.LZ4BlockMaxSize)
}
if c.Burst < 0 {
return fmt.Errorf("invalid transport.burst_buffer: %v (expected >0)", c.Burst)
if c.Burst != 0 {
if c.Burst < 32 || c.Burst > MaxTransportBurst {
return fmt.Errorf("invalid transport.burst_buffer: %d, expecting [32, 4KiB] range or 0 (default)", c.Burst)
}
}
if c.MaxHeaderSize < 0 {
return fmt.Errorf("invalid transport.max_header: %v (expected >0)", c.MaxHeaderSize)
if c.MaxHeaderSize != 0 {
if c.MaxHeaderSize < 512 || c.MaxHeaderSize > MaxTransportHeader {
return fmt.Errorf("invalid transport.max_header: %v, expecting (0, 128KiB] range or 0 (default)", c.MaxHeaderSize)
}
}
if c.IdleTeardown.D() < time.Second {
return fmt.Errorf("invalid transport.idle_teardown: %v (expected >= 1s)", c.IdleTeardown)
return fmt.Errorf("invalid transport.idle_teardown: %v (expecting >= 1s)", c.IdleTeardown)
}
if c.QuiesceTime.D() < 8*time.Second {
return fmt.Errorf("invalid transport.quiescent: %v (expected >= 8s)", c.QuiesceTime)
}
if c.MaxHeaderSize > 0 && c.MaxHeaderSize < 512 {
return fmt.Errorf("invalid transport.max_header: %v (expected >= 512)", c.MaxHeaderSize)
return fmt.Errorf("invalid transport.quiescent: %v (expecting >= 8s)", c.QuiesceTime)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion ext/dsort/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (m *Manager) initStreams() error {
Extra: &transport.Extra{
Compression: config.Dsort.Compression,
Config: config,
WorkChBurst: 1024,
ChanBurst: 1024,
},
}
if err := transport.Handle(trname, m.recvShard); err != nil {
Expand Down
14 changes: 7 additions & 7 deletions transport/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ const (
func ReservedOpcode(opc int) bool { return opc >= opcFin }

const (
SizeUnknown = -1
SizeUnknown = -1 // obj size unknown (not set)

dfltSizePDU = memsys.DefaultBufSize
maxSizePDU = memsys.MaxPageSlabSize
dfltSizeHeader = memsys.PageSize
maxSizeHeader = memsys.MaxPageSlabSize
dfltSizePDU = memsys.DefaultBufSize
maxSizePDU = memsys.MaxPageSlabSize

// see also: cmn/config for (max, default) transport header sizes
)

const sizeofh = int(unsafe.Sizeof(Obj{}))
Expand All @@ -54,8 +54,8 @@ type (
SenderID string // e.g., xaction ID (optional)
IdleTeardown time.Duration // when exceeded, causes PUT to terminate (and to renew upon the very next send)
SizePDU int32 // NOTE: 0(zero): no PDUs; must be below maxSizePDU; unknown size _requires_ PDUs
MaxHdrSize int32 // overrides `dfltMaxHdr`
WorkChBurst int // overrides `dfltBurstNum`
MaxHdrSize int32 // overrides config.Transport.MaxHeaderSize
ChanBurst int // overrides config.Transport.Burst
}

// receive-side session stats indexed by session ID (see recv.go for "uid")
Expand Down
47 changes: 39 additions & 8 deletions transport/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"
"path"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -140,21 +141,51 @@ func newBase(client Client, dstURL, dstID string, extra *Extra) (s *streamBase)
s.time.idleTeardown = dfltIdleTeardown
}
}
debug.Assertf(s.time.idleTeardown >= dfltTick, "%v vs. %v", s.time.idleTeardown, dfltTick)
debug.Assert(s.time.idleTeardown >= dfltTick, s.time.idleTeardown, " vs ", dfltTick)
s.time.ticks = int(s.time.idleTeardown / dfltTick)

s.lid = fmt.Sprintf("s-%s%s[%d]=>%s", s.trname, sid, s.sessID, dstID)
s._lid(sid, dstID, extra)

s.maxhdr, _ = g.mm.AllocSize(_sizeHdr(extra.Config, int64(extra.MaxHdrSize)))

if extra.MaxHdrSize == 0 {
s.maxhdr, _ = g.mm.AllocSize(dfltMaxHdr)
} else {
s.maxhdr, _ = g.mm.AllocSize(int64(extra.MaxHdrSize))
cos.AssertMsg(extra.MaxHdrSize <= 0xffff, "the field is uint16") // same comment in header.go
}
s.sessST.Store(inactive) // initiate HTTP session upon the first arrival
return
}

func (s *streamBase) _lid(sid, dstID string, extra *Extra) {
var sb strings.Builder

sb.WriteString("s-")
sb.WriteString(s.trname)
sb.WriteString(sid)
sb.WriteByte('[')
sb.WriteString(strconv.FormatInt(s.sessID, 10))

if extra.Compressed() {
sb.WriteByte('[')
sb.WriteString(cos.ToSizeIEC(int64(extra.Config.Transport.LZ4BlockMaxSize), 0))
sb.WriteByte(']')
}

sb.WriteString("]=>")
sb.WriteString(dstID)

s.lid = sb.String() // "s-%s%s[%d]=>%s"
}

// (used on the receive side as well)
func _sizeHdr(config *cmn.Config, size int64) int64 {
if size != 0 {
debug.Assert(size <= cmn.MaxTransportHeader, size)
size = min(size, cmn.MaxTransportHeader)
} else if config.Transport.MaxHeaderSize != 0 {
size = int64(config.Transport.MaxHeaderSize)
} else {
size = cmn.DfltTransportHeader
}
return size
}

func (s *streamBase) startSend(streamable fmt.Stringer) (err error) {
s.time.inSend.Store(true) // StreamCollector to postpone cleanups

Expand Down
2 changes: 1 addition & 1 deletion transport/obj_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestMain(t *testing.M) {
config.Transport.QuiesceTime = cos.Duration(10 * time.Second)
config.Log.Level = "3"
cmn.GCO.CommitUpdate(config)
sc := transport.Init(&dummyStatsTracker{}, config)
sc := transport.Init(&dummyStatsTracker{})
go sc.Run()

objmux = mux.NewServeMux()
Expand Down
16 changes: 10 additions & 6 deletions transport/recv.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,13 @@ func RxAnyStream(w http.ResponseWriter, r *http.Request) {
reader = lz4Reader
}

stats, uid, loghdr := h.stats(r, trname)
it := &iterator{handler: h, body: reader, stats: stats}
it.hbuf, _ = mm.AllocSize(dfltMaxHdr)
var (
config = cmn.GCO.Get()
stats, uid, loghdr = h.stats(r, trname)
it = &iterator{handler: h, body: reader, stats: stats}
)
debug.Assert(config.Transport.IdleTeardown > 0, "invalid config ", config.Transport)
it.hbuf, _ = mm.AllocSize(_sizeHdr(config, 0))

// receive loop
err = it.rxloop(uid, loghdr, mm)
Expand Down Expand Up @@ -236,14 +240,14 @@ func (it *iterator) rxloop(uid uint64, loghdr string, mm *memsys.MMSA) (err erro
break
}
if hlen > cap(it.hbuf) {
if hlen > maxSizeHeader {
err = fmt.Errorf("sbr1 %s: hlen %d exceeds maximum %d", loghdr, hlen, maxSizeHeader)
if hlen > cmn.MaxTransportHeader {
err = fmt.Errorf("sbr1 %s: transport header %d exceeds maximum %d", loghdr, hlen, cmn.MaxTransportHeader)
break
}
// grow
nlog.Warningf("%s: header length %d exceeds the current buffer %d", loghdr, hlen, cap(it.hbuf))
mm.Free(it.hbuf)
it.hbuf, _ = mm.AllocSize(min(int64(hlen)<<1, maxSizeHeader))
it.hbuf, _ = mm.AllocSize(min(int64(hlen)<<1, cmn.MaxTransportHeader))
}

it.stats.addOff(int64(hlen + sizeProtoHdr))
Expand Down
3 changes: 1 addition & 2 deletions transport/sendobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ func (s *Stream) initCompression(extra *Extra) {
} else {
s.lz4s.sgl = g.mm.NewSGL(cos.KiB*64, cos.KiB*64)
}
s.lid = fmt.Sprintf("%s[%d[%s]]", s.trname, s.sessID, cos.ToSizeIEC(int64(s.lz4s.blockMaxSize), 0))
}

func (s *Stream) compressed() bool { return s.lz4s != nil }
Expand Down Expand Up @@ -341,7 +340,7 @@ func (s *Stream) dryrun() {
var (
body = io.NopCloser(s)
h = &hdl{trname: s.trname}
it = iterator{handler: h, body: body, hbuf: make([]byte, dfltMaxHdr)}
it = iterator{handler: h, body: body, hbuf: make([]byte, cmn.DfltTransportHeader)}
)
for {
hlen, flags, err := it.nextProtoHdr(s.String())
Expand Down
25 changes: 11 additions & 14 deletions transport/tinit.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ import (

"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/cmn/nlog"
"github.com/NVIDIA/aistore/memsys"
)

// transport defaults
const (
dfltBurstNum = 256 // burst size (see: config.Transport.Burst)
dfltTick = time.Second
dfltTickIdle = dfltTick << 8 // (when there are no streams to _collect_)
dfltIdleTeardown = 4 * time.Second // (see config.Transport.IdleTeardown)
Expand All @@ -31,16 +31,15 @@ const (
)

type global struct {
tstats cos.StatsUpdater // subset of stats.Tracker interface, the minimum required
tstats cos.StatsUpdater // strict subset of stats.Tracker interface (the minimum required)
mm *memsys.MMSA
}

var (
g global
dfltMaxHdr int64 // memsys.PageSize or cluster-configurable (`config.Transport.MaxHeaderSize`)
g global
)

func Init(tstats cos.StatsUpdater, config *cmn.Config) *StreamCollector {
func Init(tstats cos.StatsUpdater) *StreamCollector {
g.mm = memsys.PageMM()
g.tstats = tstats

Expand All @@ -49,10 +48,6 @@ func Init(tstats cos.StatsUpdater, config *cmn.Config) *StreamCollector {
hmaps[i] = make(hmap, 4)
}

dfltMaxHdr = dfltSizeHeader
if config.Transport.MaxHeaderSize > 0 {
dfltMaxHdr = int64(config.Transport.MaxHeaderSize)
}
// real stream collector
gc = &collector{
ctrlCh: make(chan ctrl, dfltCollectChan),
Expand All @@ -67,13 +62,15 @@ func Init(tstats cos.StatsUpdater, config *cmn.Config) *StreamCollector {
}

func burst(extra *Extra) (burst int) {
if extra.WorkChBurst > 0 {
return extra.WorkChBurst
if extra.ChanBurst > 0 {
debug.Assert(extra.ChanBurst <= cmn.MaxTransportBurst, extra.ChanBurst)
return min(extra.ChanBurst, cmn.MaxTransportBurst)
}
config := extra.Config
if burst = config.Transport.Burst; burst == 0 {
burst = dfltBurstNum
if burst = extra.Config.Transport.Burst; burst == 0 {
burst = cmn.DfltTransportBurst
}

// (feat)
if a := os.Getenv("AIS_STREAM_BURST_NUM"); a != "" {
if burst64, err := strconv.ParseInt(a, 10, 0); err != nil {
nlog.Errorln(err)
Expand Down

0 comments on commit 0a54545

Please sign in to comment.