From 9dcbfad7d81671bb7bc45cdfb46771ead362eae0 Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Mon, 19 Aug 2024 18:02:14 -0400 Subject: [PATCH] (intra-cluster transport, rebalance): channels, log, more log, refactoring * make transport module's verbosity settable at runtime - redo most of the verbose logging * transport/stream collector: - increase chan size - periodically dump idle streams, if any * reb and transport/bundle modules - add begin/end log records * add open/close/abort log records * LZ4 compressed stream (state) is now a pointer * add yet another scripted test (target IDs hardcoded) * with minor refactoring Signed-off-by: Alex Aizman --- ais/daemon.go | 4 ++-- ais/htrun.go | 4 ++-- ais/test/scripts/reb-ex1.sh | 15 +++++++++++++++ cmn/cos/log.go | 6 +++--- core/lom.go | 2 +- ext/etl/api.go | 2 +- memsys/init.go | 2 +- reb/globrun.go | 32 +++++++++++++++++--------------- reb/recv.go | 4 ++-- reb/utils.go | 2 +- transport/api.go | 2 +- transport/base.go | 28 ++++++++++++++-------------- transport/bundle/dmover.go | 19 ++++++++++++++++--- transport/client_fasthttp.go | 8 ++++---- transport/client_nethttp.go | 4 ++-- transport/collect.go | 26 ++++++++++++++++++++++++-- transport/recv.go | 8 ++++---- transport/sendobj.go | 30 ++++++++++++++---------------- transport/tinit.go | 12 +++++++----- xact/xs/nsumm.go | 8 ++++++-- 20 files changed, 137 insertions(+), 81 deletions(-) create mode 100755 ais/test/scripts/reb-ex1.sh diff --git a/ais/daemon.go b/ais/daemon.go index 23cbe1b0a6..7918b1b670 100644 --- a/ais/daemon.go +++ b/ais/daemon.go @@ -321,7 +321,7 @@ func Run(version, buildTime string) int { return 0 } if e, ok := err.(*cos.ErrSignal); ok { - nlog.Infof("Terminated OK via %v", e) + nlog.Infoln("Terminated OK via", e) return e.ExitCode() } if errors.Is(err, cmn.ErrStartupTimeout) { @@ -331,7 +331,7 @@ func Run(version, buildTime string) int { // to restart the daemon if the primary gets killed or panics prior (to reaching that state) nlog.Errorln("Timed-out while starting up") } - nlog.Errorf("Terminated with err: %v", err) + nlog.Errorln("Terminated with err:", err) return 1 } diff --git a/ais/htrun.go b/ais/htrun.go index 978b1d11b2..6ad836491c 100644 --- a/ais/htrun.go +++ b/ais/htrun.go @@ -1365,14 +1365,14 @@ func (h *htrun) writeErrf(w http.ResponseWriter, r *http.Request, format string, func (h *htrun) writeErrURL(w http.ResponseWriter, r *http.Request) { if r.URL.Scheme != "" { - h.writeErrf(w, r, "request '%s %s://%s': invalid URL path", r.Method, r.URL.Scheme, r.URL.Path) + h.writeErrf(w, r, "request '%s %s://%s' from %s: invalid URL path", r.Method, r.URL.Scheme, r.URL.Path, r.RemoteAddr) return } // ignore GET /favicon.ico by Browsers if r.URL.Path == "/favicon.ico" || r.URL.Path == "favicon.ico" { return } - h.writeErrf(w, r, "invalid request URI: '%s %s'", r.Method, r.RequestURI) + h.writeErrf(w, r, "invalid request URI: '%s %s' from %s", r.Method, r.RequestURI, r.RemoteAddr) } func (h *htrun) writeErrAct(w http.ResponseWriter, r *http.Request, action string) { diff --git a/ais/test/scripts/reb-ex1.sh b/ais/test/scripts/reb-ex1.sh new file mode 100755 index 0000000000..945d5bcaf4 --- /dev/null +++ b/ais/test/scripts/reb-ex1.sh @@ -0,0 +1,15 @@ +#!/bin/bash +for i in {1..6}; do + ais cluster add-remove-nodes shutdown t[Qxut8086] --no-rebalance --yes + sleep 3 + aisnode -config=/root/.ais6/ais.json -local_config=/root/.ais6/ais_local.json -role=target & + sleep 3 + ais cluster add-remove-nodes stop-maintenance t[Qxut8086] --yes + sleep 3 + ais cluster add-remove-nodes shutdown t[RVst8090] --no-rebalance --yes + sleep 3 + aisnode -config=/root/.ais10/ais.json -local_config=/root/.ais10/ais_local.json -role=target & + sleep 3 + ais cluster add-remove-nodes stop-maintenance t[RVst8090] --yes + sleep 3 +done diff --git a/cmn/cos/log.go b/cmn/cos/log.go index 1665f331c6..cca21daf05 100644 --- a/cmn/cos/log.go +++ b/cmn/cos/log.go @@ -12,11 +12,11 @@ import ( "github.com/NVIDIA/aistore/cmn/nlog" ) -func Infof(format string, a ...any) { +func Infoln(a ...any) { if flag.Parsed() { - nlog.InfoDepth(1, fmt.Sprintf(format, a...)) + nlog.InfoDepth(1, a...) } else { - fmt.Printf(format+"\n", a...) + fmt.Println(a...) } } diff --git a/core/lom.go b/core/lom.go index b46b2275c5..54c4712745 100644 --- a/core/lom.go +++ b/core/lom.go @@ -503,7 +503,7 @@ func (lom *LOM) Recache() { } func (lom *LOM) _collide(lmd *lmeta) { - if cmn.Rom.FastV(4, cos.SmoduleCore) || (lom.digest%17) == 5 { + if cmn.Rom.FastV(4, cos.SmoduleCore) || lom.digest&0xf == 5 { nlog.InfoDepth(1, LcacheCollisionCount, lom.digest, "[", *lmd.uname, "]", *lom.md.uname, lom.Cname()) } g.tstats.Inc(LcacheCollisionCount) diff --git a/ext/etl/api.go b/ext/etl/api.go index bdf6d822f3..c04d0a0f60 100644 --- a/ext/etl/api.go +++ b/ext/etl/api.go @@ -219,7 +219,7 @@ func (m *InitMsgBase) validate(detail string) error { // NOTE: default comm-type if m.CommType() == "" { - cos.Infof("Warning: empty comm-type, defaulting to %q", Hpush) + cos.Infoln("Warning: empty comm-type, defaulting to", Hpush) m.CommTypeX = Hpush } // NOTE: default timeout diff --git a/memsys/init.go b/memsys/init.go index 60d5af8dd3..8eb69858d5 100644 --- a/memsys/init.go +++ b/memsys/init.go @@ -70,7 +70,7 @@ func NewMMSA(name string, silent bool) (mem *MMSA, err error) { } err = mem.Init(0) if !silent { - cos.Infof("%s", mem.Str(&mem.mem)) + cos.Infoln(mem.Str(&mem.mem)) } return } diff --git a/reb/globrun.go b/reb/globrun.go index b241126f4d..34aa8c28b8 100644 --- a/reb/globrun.go +++ b/reb/globrun.go @@ -445,7 +445,7 @@ func (reb *Reb) initRenew(rargs *rebArgs, notif *xact.NotifXact, logHdr string, reb.endStreams(err) xctn.Abort(err) reb.mu.Unlock() - nlog.Errorf("FATAL: %v, WRITE: %v", fatalErr, writeErr) + nlog.Errorln("FATAL:", fatalErr, "WRITE:", writeErr) return false } @@ -454,7 +454,7 @@ func (reb *Reb) initRenew(rargs *rebArgs, notif *xact.NotifXact, logHdr string, reb.stages.cleanup() reb.mu.Unlock() - nlog.Infof("%s: running %s", reb.logHdr(rargs.id, rargs.smap), reb.xctn()) + nlog.Infoln(reb.logHdr(rargs.id, rargs.smap), "- running", reb.xctn()) return true } @@ -482,10 +482,10 @@ func (reb *Reb) abortStreams() { } func (reb *Reb) endStreams(err error) { - if reb.stages.stage.CAS(rebStageFin, rebStageFinStreams) { - reb.dm.Close(err) - reb.pushes.Close(true) - } + swapped := reb.stages.stage.CAS(rebStageFin, rebStageFinStreams) + debug.Assert(swapped) + reb.dm.Close(err) + reb.pushes.Close(err == nil) } // when at least one bucket has EC enabled @@ -578,7 +578,7 @@ func (reb *Reb) rebWaitAck(rargs *rebArgs) (errCnt int) { for _, lom := range lomack.q { tsi, err := smap.HrwHash2T(lom.Digest()) if err == nil { - nlog.Infof("waiting for %s ACK from %s", lom, tsi.StringEx()) + nlog.Infoln("waiting for", lom.String(), "ACK from", tsi.StringEx()) logged = true break } @@ -587,7 +587,7 @@ func (reb *Reb) rebWaitAck(rargs *rebArgs) (errCnt int) { } lomack.mu.Unlock() if err := xreb.AbortErr(); err != nil { - nlog.Infof("%s: abort wait-ack (%v)", logHdr, err) + nlog.Infoln(logHdr, "abort wait-ack:", err) return } } @@ -597,7 +597,7 @@ func (reb *Reb) rebWaitAck(rargs *rebArgs) (errCnt int) { } nlog.Warningf("%s: waiting for %d ACKs", logHdr, cnt) if err := xreb.AbortedAfter(sleep); err != nil { - nlog.Infof("%s: abort wait-ack (%v)", logHdr, err) + nlog.Infoln(logHdr, "abort wait-ack:", err) return } @@ -707,10 +707,8 @@ func (reb *Reb) _aborted(rargs *rebArgs) (yes bool) { } func (reb *Reb) fini(rargs *rebArgs, logHdr string, err error) { - var stats core.Stats - if cmn.Rom.FastV(4, cos.SmoduleReb) { - nlog.Infof("finishing rebalance (reb_args: %s)", reb.logHdr(rargs.id, rargs.smap)) - } + nlog.Infoln(logHdr, "fini") + // prior to closing the streams if q := reb.quiesce(rargs, rargs.config.Transport.QuiesceTime.D(), reb.nodesQuiescent); q != core.QuiAborted { if errM := fs.RemoveMarker(fname.RebalanceMarker); errM == nil { @@ -720,7 +718,11 @@ func (reb *Reb) fini(rargs *rebArgs, logHdr string, err error) { } reb.endStreams(err) reb.filterGFN.Reset() - xreb := reb.xctn() + + var ( + stats core.Stats + xreb = reb.xctn() + ) xreb.ToStats(&stats) if stats.Objs > 0 || stats.OutObjs > 0 || stats.InObjs > 0 { s, e := jsoniter.MarshalIndent(&stats, "", " ") @@ -735,7 +737,7 @@ func (reb *Reb) fini(rargs *rebArgs, logHdr string, err error) { if !xreb.Finished() { xreb.Finish() } - nlog.Infof("%s: done (%s)", logHdr, xreb) + nlog.Infoln(logHdr, "done", xreb.String()) } ////////////////////////////// diff --git a/reb/recv.go b/reb/recv.go index b57aab0a2e..eee6051892 100644 --- a/reb/recv.go +++ b/reb/recv.go @@ -140,7 +140,7 @@ func (reb *Reb) recvObjRegular(hdr *transport.ObjHdr, smap *meta.Smap, unpacker return err } if ack.rebID != reb.RebID() { - nlog.Warningf("received %s: %s", hdr.Cname(), reb.warnID(ack.rebID, ack.daemonID)) + nlog.Warningln("received", hdr.Cname(), reb.warnID(ack.rebID, ack.daemonID)) return nil } tsid := ack.daemonID // the sender @@ -209,7 +209,7 @@ func (reb *Reb) recvRegularAck(hdr *transport.ObjHdr, unpacker *cos.ByteUnpack) return err } if ack.rebID != reb.rebID.Load() { - nlog.Warningf("ACK from %s: %s", ack.daemonID, reb.warnID(ack.rebID, ack.daemonID)) + nlog.Warningln("ACK from", ack.daemonID, reb.warnID(ack.rebID, ack.daemonID)) return nil } diff --git a/reb/utils.go b/reb/utils.go index 47ddb613a7..37ddba02d9 100644 --- a/reb/utils.go +++ b/reb/utils.go @@ -100,7 +100,7 @@ func (reb *Reb) changeStage(newStage uint32) { hdr.Opaque = reb.encodeStageNtfn(&req) // second, notify all if err := reb.pushes.Send(&transport.Obj{Hdr: hdr}, nil); err != nil { - nlog.Warningf("Failed to broadcast ack %s: %v", stages[newStage], err) + nlog.Warningln("Failed to push new-stage notif:", req.rebID, stages[newStage], "err:", err) } } diff --git a/transport/api.go b/transport/api.go index 7957e744e8..6ebcc94801 100644 --- a/transport/api.go +++ b/transport/api.go @@ -159,7 +159,7 @@ func (s *Stream) Send(obj *Obj) (err error) { } s.workCh <- obj - if l, c := len(s.workCh), cap(s.workCh); l > c/2 { + if l, c := len(s.workCh), cap(s.workCh); l > (c - c>>2) { runtime.Gosched() // poor man's throttle if l == c { s.chanFull.Inc() diff --git a/transport/base.go b/transport/base.go index cf4c2129ba..baac100992 100644 --- a/transport/base.go +++ b/transport/base.go @@ -1,7 +1,7 @@ // Package transport provides long-lived http/tcp connections for // intra-cluster communications (see README for details and usage example). /* - * Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved. */ package transport @@ -168,8 +168,8 @@ func (s *streamBase) startSend(streamable fmt.Stringer) (err error) { if s.sessST.CAS(inactive, active) { s.postCh <- struct{}{} - if verbose { - nlog.Infof("%s: inactive => active", s) + if cmn.Rom.FastV(5, cos.SmoduleTransport) { + nlog.Infoln(s.String(), "inactive => active") } } return @@ -212,21 +212,21 @@ func (s *streamBase) isNextReq() (reason string) { for { select { case <-s.lastCh.Listen(): - if verbose { - nlog.Infof("%s: end-of-stream", s) + if cmn.Rom.FastV(5, cos.SmoduleTransport) { + nlog.Infoln(s.String(), "end-of-stream") } reason = endOfStream return case <-s.stopCh.Listen(): - if verbose { - nlog.Infof("%s: stopped", s) + if cmn.Rom.FastV(5, cos.SmoduleTransport) { + nlog.Infoln(s.String(), "stopped") } reason = reasonStopped return case <-s.postCh: s.sessST.Store(active) - if verbose { - nlog.Infof("%s: active <- posted", s) + if cmn.Rom.FastV(5, cos.SmoduleTransport) { + nlog.Infoln(s.String(), "active <- posted") } return } @@ -235,9 +235,8 @@ func (s *streamBase) isNextReq() (reason string) { func (s *streamBase) deactivate() (n int, err error) { err = io.EOF - if verbose { - num := s.stats.Num.Load() - nlog.Infof("%s: connection teardown (%d/%d)", s, s.numCur, num) + if cmn.Rom.FastV(5, cos.SmoduleTransport) { + nlog.Infoln(s.String(), "connection teardown: [", s.numCur, s.stats.Num.Load(), "]") } return } @@ -260,7 +259,7 @@ func (s *streamBase) sendLoop(dryrun bool) { break } retried = true - nlog.Errorf("%s: %v - retrying...", s, errR) + nlog.Errorln(s.String(), "err: ", errR, "- retrying...") time.Sleep(connErrWait) } } @@ -288,8 +287,9 @@ func (s *streamBase) sendLoop(dryrun bool) { // cleanup s.streamer.abortPending(err, false /*completions*/) + verbose := cmn.Rom.FastV(5, cos.SmoduleTransport) if cnt := s.chanFull.Load(); (cnt >= 10 && cnt <= 20) || (cnt > 0 && verbose) { - nlog.Errorln(cos.ErrWorkChanFull, s.lid, "cnt", cnt) + nlog.Errorln(s.String(), cos.ErrWorkChanFull, "cnt:", cnt) } } diff --git a/transport/bundle/dmover.go b/transport/bundle/dmover.go index 6f00f3a7c0..93a6609a52 100644 --- a/transport/bundle/dmover.go +++ b/transport/bundle/dmover.go @@ -152,7 +152,9 @@ func (dm *DataMover) Open() { } dm.ack.streams = New(dm.ack.client, ackArgs) } + dm.stage.opened.Store(true) + nlog.Infoln(dm.String(), "is open") } func (dm *DataMover) String() string { @@ -179,18 +181,29 @@ func (dm *DataMover) Quiesce(d time.Duration) core.QuiRes { func (dm *DataMover) Close(err error) { if dm == nil { + nlog.Errorln("Warning: DM is ") return } if !dm.stage.opened.CAS(true, false) { + nlog.Errorln("Warning:", dm.String(), "not open") return } if err == nil && dm.xctn != nil && dm.xctn.IsAborted() { err = dm.xctn.AbortErr() } // nil: close gracefully via `fin`, otherwise abort - dm.data.streams.Close(err == nil) - if dm.useACKs() { - dm.ack.streams.Close(err == nil) + if err == nil { + dm.data.streams.Close(true) + if dm.useACKs() { + dm.ack.streams.Close(true) + } + nlog.Infoln(dm.String(), "closed") + } else { + dm.data.streams.Close(false) + if dm.useACKs() { + dm.ack.streams.Close(false) + } + nlog.Infoln(dm.String(), "aborted:", err) } } diff --git a/transport/client_fasthttp.go b/transport/client_fasthttp.go index e792aec4c0..c7fe3c0a44 100644 --- a/transport/client_fasthttp.go +++ b/transport/client_fasthttp.go @@ -75,10 +75,10 @@ func (s *streamBase) do(body io.Reader) (err error) { // do err = s.client.Do(req, resp) if err != nil { - if verbose { - nlog.Errorf("%s: Error [%v]", s, err) + if cmn.Rom.FastV(5, cos.SmoduleTransport) { + nlog.Errorln(s.String(), "err:", err) } - return + return err } // handle response & cleanup resp.BodyWriteTo(io.Discard) @@ -87,5 +87,5 @@ func (s *streamBase) do(body io.Reader) (err error) { if s.streamer.compressed() { s.streamer.resetCompression() } - return + return nil } diff --git a/transport/client_nethttp.go b/transport/client_nethttp.go index 68fab30d80..53bc68c373 100644 --- a/transport/client_nethttp.go +++ b/transport/client_nethttp.go @@ -71,8 +71,8 @@ func (s *streamBase) do(body io.Reader) (err error) { response, err = s.client.Do(request) if err != nil { - if verbose { - nlog.Errorf("%s: Error [%v]", s, err) + if cmn.Rom.FastV(5, cos.SmoduleTransport) { + nlog.Errorln(s.String(), "err:", err) } return } diff --git a/transport/collect.go b/transport/collect.go index 581d2099e0..67959dd636 100644 --- a/transport/collect.go +++ b/transport/collect.go @@ -12,6 +12,7 @@ import ( "github.com/NVIDIA/aistore/cmn/atomic" "github.com/NVIDIA/aistore/cmn/cos" "github.com/NVIDIA/aistore/cmn/debug" + "github.com/NVIDIA/aistore/cmn/mono" "github.com/NVIDIA/aistore/cmn/nlog" ) @@ -47,8 +48,8 @@ var _ cos.Runner = (*StreamCollector)(nil) func (*StreamCollector) Name() string { return "stream-collector" } func (sc *StreamCollector) Run() error { - cos.Infof("Intra-cluster networking: %s client", whichClient()) - cos.Infof("Starting %s", sc.Name()) + cos.Infoln("Intra-cluster networking:", whichClient(), "client") + cos.Infoln("Starting", sc.Name()) gc.ticker = time.NewTicker(dfltTickIdle) gc.none.Store(true) gc.run() @@ -62,10 +63,29 @@ func (sc *StreamCollector) Stop(err error) { } func (gc *collector) run() { + var prev int64 for { select { case <-gc.ticker.C: gc.do() + + // periodic log + if !gc.none.Load() { + now := mono.NanoTime() + if time.Duration(now-prev) >= dfltCollectLog { + var s *streamBase + for _, s = range gc.streams { + break + } + nlog.Infoln("total:", len(gc.streams), "one:", s.String()) + prev = now + + if l, c := len(gc.ctrlCh), cap(gc.ctrlCh); l > (c - c>>2) { + nlog.Errorln("control channel full", l, c) // compare w/ cos.ErrWorkChanFull + } + } + } + case ctrl, ok := <-gc.ctrlCh: if !ok { return @@ -78,6 +98,7 @@ func (gc *collector) run() { heap.Push(gc, s) if gc.none.CAS(true, false) { gc.ticker.Reset(dfltTick) + prev = mono.NanoTime() } } else if ok { heap.Remove(gc, s.time.index) @@ -156,6 +177,7 @@ func (gc *collector) do() { gc.ticker.Reset(dfltTickIdle) debug.Assert(!gc.none.Load()) gc.none.Store(true) + nlog.Infoln("none") } s.streamer.closeAndFree() s.streamer.abortPending(err, true /*completions*/) diff --git a/transport/recv.go b/transport/recv.go index 2ba848ae6b..4a9f390a7f 100644 --- a/transport/recv.go +++ b/transport/recv.go @@ -103,8 +103,8 @@ func RxAnyStream(w http.ResponseWriter, r *http.Request) { // at the lowest level (and with no handler and its rxObj cb). // if _, ok := err.(*errAlreadyClosedTrname); ok { - if verbose { - nlog.Errorln(err) + if cmn.Rom.FastV(5, cos.SmoduleTransport) { + nlog.Errorln(trname, "err:", err) } } else { cmn.WriteErr(w, r, err, 0) @@ -169,8 +169,8 @@ func (h *hdlExtra) stats(r *http.Request, trname string) (rxStats, uint64, strin xxh, _ := UID2SessID(uid) loghdr := fmt.Sprintf("%s[%d:%d]", h.trname, xxh, sessID) - if verbose { - nlog.Infof("%s: start-of-stream from %s", loghdr, r.RemoteAddr) + if cmn.Rom.FastV(5, cos.SmoduleTransport) { + nlog.Infoln(loghdr, "start-of-stream from", r.RemoteAddr) } return statsif.(rxStats), uid, loghdr } diff --git a/transport/sendobj.go b/transport/sendobj.go index 23d2a2fb09..75d09b70c4 100644 --- a/transport/sendobj.go +++ b/transport/sendobj.go @@ -24,8 +24,8 @@ type ( workCh chan *Obj // aka SQ: next object to stream cmplCh chan cmpl // aka SCQ; note that SQ and SCQ together form a FIFO callback ObjSentCB // to free SGLs, close files, etc. + lz4s *lz4Stream sendoff sendoff - lz4s lz4Stream streamBase } lz4Stream struct { @@ -85,6 +85,7 @@ func (s *Stream) terminate(err error, reason string) (actReason string, actErr e } func (s *Stream) initCompression(extra *Extra) { + s.lz4s = &lz4Stream{} s.lz4s.s = s s.lz4s.blockMaxSize = int(extra.Config.Transport.LZ4BlockMaxSize) s.lz4s.frameChecksum = extra.Config.Transport.LZ4FrameChecksum @@ -96,7 +97,7 @@ func (s *Stream) initCompression(extra *Extra) { s.lid = fmt.Sprintf("%s[%d[%s]]", s.trname, s.sessID, cos.ToSizeIEC(int64(s.lz4s.blockMaxSize), 0)) } -func (s *Stream) compressed() bool { return s.lz4s.s == s } +func (s *Stream) compressed() bool { return s.lz4s != nil } func (s *Stream) usePDU() bool { return s.pdu != nil } func (s *Stream) resetCompression() { @@ -171,7 +172,7 @@ func (s *Stream) doRequest() error { s.lz4s.zw.Header.BlockChecksum = false s.lz4s.zw.Header.NoChecksum = !s.lz4s.frameChecksum s.lz4s.zw.Header.BlockMaxSize = s.lz4s.blockMaxSize - return s.do(&s.lz4s) + return s.do(s.lz4s) } // as io.Reader @@ -234,8 +235,8 @@ repeat: s.sendoff.ins = inHdr return s.sendHdr(b) case <-s.stopCh.Listen(): - if verbose { - nlog.Infof("%s: stopped (%d/%d)", s, s.numCur, s.stats.Num.Load()) + if cmn.Rom.FastV(5, cos.SmoduleTransport) { + nlog.Infoln(s.String(), "stopped [", s.numCur, s.stats.Num.Load(), "]") } err = io.EOF return @@ -250,21 +251,18 @@ func (s *Stream) sendHdr(b []byte) (n int, err error) { } debug.Assert(s.sendoff.off == int64(len(s.header))) s.stats.Offset.Add(s.sendoff.off) - if verbose { - num := s.stats.Num.Load() - nlog.Infof("%s: hlen=%d (%d/%d)", s, s.sendoff.off, s.numCur, num) - } + obj := &s.sendoff.obj if s.usePDU() && !obj.IsHeaderOnly() { s.sendoff.ins = inPDU } else { s.sendoff.ins = inData } + if cmn.Rom.FastV(5, cos.SmoduleTransport) && s.numCur&0x3f == 2 { + nlog.Infoln(s.String(), obj.Hdr.Cname(), "[", s.numCur, s.stats.Num.Load(), "]") + } s.sendoff.off = 0 if obj.Hdr.isFin() { - if verbose { - nlog.Infof("%s: sent last", s) - } err = io.EOF s.lastCh.Close() } @@ -320,8 +318,8 @@ func (s *Stream) eoObj(err error) { s.stats.Size.Add(objSize) s.numCur++ s.stats.Num.Inc() - if verbose { - nlog.Infof("%s: sent %s (%d/%d)", s, obj, s.numCur, s.stats.Num.Load()) + if cmn.Rom.FastV(5, cos.SmoduleTransport) && s.numCur&0x3f == 3 { + nlog.Infoln(s.String(), obj.Hdr.Cname(), "[", s.numCur, s.stats.Num.Load(), "]") } // target stats @@ -396,8 +394,8 @@ func (s *Stream) closeAndFree() { func (s *Stream) idleTick() { if len(s.workCh) == 0 && s.sessST.CAS(active, inactive) { s.workCh <- &Obj{Hdr: ObjHdr{Opcode: opcIdleTick}} - if verbose { - nlog.Infof("%s: active => inactive", s) + if cmn.Rom.FastV(5, cos.SmoduleTransport) { + nlog.Infoln(s.String(), "active => inactive") } } } diff --git a/transport/tinit.go b/transport/tinit.go index ce295a3f47..0af56f82ab 100644 --- a/transport/tinit.go +++ b/transport/tinit.go @@ -19,12 +19,17 @@ import ( // transport defaults const ( - dfltBurstNum = 128 // burst size (see: config.Transport.Burst) + dfltBurstNum = 256 // burst size (see: config.Transport.Burst) dfltTick = time.Second dfltTickIdle = dfltTick << 8 // (when there are no streams to _collect_) dfltIdleTeardown = 4 * time.Second // (see config.Transport.IdleTeardown) ) +const ( + dfltCollectLog = 10 * time.Minute + dfltCollectChan = 256 +) + type global struct { tstats cos.StatsUpdater // subset of stats.Tracker interface, the minimum required mm *memsys.MMSA @@ -33,12 +38,9 @@ type global struct { var ( g global dfltMaxHdr int64 // memsys.PageSize or cluster-configurable (`config.Transport.MaxHeaderSize`) - verbose bool ) func Init(tstats cos.StatsUpdater, config *cmn.Config) *StreamCollector { - verbose = cmn.Rom.FastV(5 /*super-verbose*/, cos.SmoduleTransport) - g.mm = memsys.PageMM() g.tstats = tstats @@ -53,7 +55,7 @@ func Init(tstats cos.StatsUpdater, config *cmn.Config) *StreamCollector { } // real stream collector gc = &collector{ - ctrlCh: make(chan ctrl, 64), + ctrlCh: make(chan ctrl, dfltCollectChan), streams: make(map[string]*streamBase, 64), heap: make([]*streamBase, 0, 64), // min-heap sorted by stream.time.ticks } diff --git a/xact/xs/nsumm.go b/xact/xs/nsumm.go index b509b3b71d..03da817cd6 100644 --- a/xact/xs/nsumm.go +++ b/xact/xs/nsumm.go @@ -150,11 +150,15 @@ func newSumm(p *nsummFactory) (r *XactNsumm, err error) { func (r *XactNsumm) Run(started *sync.WaitGroup) { started.Done() - nlog.Infoln(r.Name(), r.p.Bck.Cname("")) - var ( + bname string rwg, lwg cos.WG ) + if !r.p.Bck.IsEmpty() { + bname = r.p.Bck.Cname("") + } + nlog.Infoln(r.Name(), bname) + // (I) remote if r.listRemote { // _this_ target to list-and-summ remote pages, in parallel