Skip to content

Commit

Permalink
(intra-cluster transport, rebalance): channels, log, more log, refact…
Browse files Browse the repository at this point in the history
…oring

* make transport module's verbosity settable at runtime
  - redo most of the verbose logging
* transport/stream collector:
  - increase chan size
  - periodically dump idle streams, if any
* reb and transport/bundle modules
  - add begin/end log records
* add open/close/abort log records
* LZ4 compressed stream (state) is now a pointer
* add yet another scripted test (target IDs hardcoded)
* with minor refactoring

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Aug 19, 2024
1 parent e743e01 commit 9dcbfad
Show file tree
Hide file tree
Showing 20 changed files with 137 additions and 81 deletions.
4 changes: 2 additions & 2 deletions ais/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func Run(version, buildTime string) int {
return 0
}
if e, ok := err.(*cos.ErrSignal); ok {
nlog.Infof("Terminated OK via %v", e)
nlog.Infoln("Terminated OK via", e)
return e.ExitCode()
}
if errors.Is(err, cmn.ErrStartupTimeout) {
Expand All @@ -331,7 +331,7 @@ func Run(version, buildTime string) int {
// to restart the daemon if the primary gets killed or panics prior (to reaching that state)
nlog.Errorln("Timed-out while starting up")
}
nlog.Errorf("Terminated with err: %v", err)
nlog.Errorln("Terminated with err:", err)
return 1
}

Expand Down
4 changes: 2 additions & 2 deletions ais/htrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -1365,14 +1365,14 @@ func (h *htrun) writeErrf(w http.ResponseWriter, r *http.Request, format string,

func (h *htrun) writeErrURL(w http.ResponseWriter, r *http.Request) {
if r.URL.Scheme != "" {
h.writeErrf(w, r, "request '%s %s://%s': invalid URL path", r.Method, r.URL.Scheme, r.URL.Path)
h.writeErrf(w, r, "request '%s %s://%s' from %s: invalid URL path", r.Method, r.URL.Scheme, r.URL.Path, r.RemoteAddr)
return
}
// ignore GET /favicon.ico by Browsers
if r.URL.Path == "/favicon.ico" || r.URL.Path == "favicon.ico" {
return
}
h.writeErrf(w, r, "invalid request URI: '%s %s'", r.Method, r.RequestURI)
h.writeErrf(w, r, "invalid request URI: '%s %s' from %s", r.Method, r.RequestURI, r.RemoteAddr)
}

func (h *htrun) writeErrAct(w http.ResponseWriter, r *http.Request, action string) {
Expand Down
15 changes: 15 additions & 0 deletions ais/test/scripts/reb-ex1.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/bin/bash
for i in {1..6}; do
ais cluster add-remove-nodes shutdown t[Qxut8086] --no-rebalance --yes
sleep 3
aisnode -config=/root/.ais6/ais.json -local_config=/root/.ais6/ais_local.json -role=target &
sleep 3
ais cluster add-remove-nodes stop-maintenance t[Qxut8086] --yes
sleep 3
ais cluster add-remove-nodes shutdown t[RVst8090] --no-rebalance --yes
sleep 3
aisnode -config=/root/.ais10/ais.json -local_config=/root/.ais10/ais_local.json -role=target &
sleep 3
ais cluster add-remove-nodes stop-maintenance t[RVst8090] --yes
sleep 3
done
6 changes: 3 additions & 3 deletions cmn/cos/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import (
"github.com/NVIDIA/aistore/cmn/nlog"
)

func Infof(format string, a ...any) {
func Infoln(a ...any) {
if flag.Parsed() {
nlog.InfoDepth(1, fmt.Sprintf(format, a...))
nlog.InfoDepth(1, a...)
} else {
fmt.Printf(format+"\n", a...)
fmt.Println(a...)
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/lom.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ func (lom *LOM) Recache() {
}

func (lom *LOM) _collide(lmd *lmeta) {
if cmn.Rom.FastV(4, cos.SmoduleCore) || (lom.digest%17) == 5 {
if cmn.Rom.FastV(4, cos.SmoduleCore) || lom.digest&0xf == 5 {
nlog.InfoDepth(1, LcacheCollisionCount, lom.digest, "[", *lmd.uname, "]", *lom.md.uname, lom.Cname())
}
g.tstats.Inc(LcacheCollisionCount)
Expand Down
2 changes: 1 addition & 1 deletion ext/etl/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (m *InitMsgBase) validate(detail string) error {

// NOTE: default comm-type
if m.CommType() == "" {
cos.Infof("Warning: empty comm-type, defaulting to %q", Hpush)
cos.Infoln("Warning: empty comm-type, defaulting to", Hpush)
m.CommTypeX = Hpush
}
// NOTE: default timeout
Expand Down
2 changes: 1 addition & 1 deletion memsys/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func NewMMSA(name string, silent bool) (mem *MMSA, err error) {
}
err = mem.Init(0)
if !silent {
cos.Infof("%s", mem.Str(&mem.mem))
cos.Infoln(mem.Str(&mem.mem))
}
return
}
Expand Down
32 changes: 17 additions & 15 deletions reb/globrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func (reb *Reb) initRenew(rargs *rebArgs, notif *xact.NotifXact, logHdr string,
reb.endStreams(err)
xctn.Abort(err)
reb.mu.Unlock()
nlog.Errorf("FATAL: %v, WRITE: %v", fatalErr, writeErr)
nlog.Errorln("FATAL:", fatalErr, "WRITE:", writeErr)
return false
}

Expand All @@ -454,7 +454,7 @@ func (reb *Reb) initRenew(rargs *rebArgs, notif *xact.NotifXact, logHdr string,
reb.stages.cleanup()

reb.mu.Unlock()
nlog.Infof("%s: running %s", reb.logHdr(rargs.id, rargs.smap), reb.xctn())
nlog.Infoln(reb.logHdr(rargs.id, rargs.smap), "- running", reb.xctn())
return true
}

Expand Down Expand Up @@ -482,10 +482,10 @@ func (reb *Reb) abortStreams() {
}

func (reb *Reb) endStreams(err error) {
if reb.stages.stage.CAS(rebStageFin, rebStageFinStreams) {
reb.dm.Close(err)
reb.pushes.Close(true)
}
swapped := reb.stages.stage.CAS(rebStageFin, rebStageFinStreams)
debug.Assert(swapped)
reb.dm.Close(err)
reb.pushes.Close(err == nil)
}

// when at least one bucket has EC enabled
Expand Down Expand Up @@ -578,7 +578,7 @@ func (reb *Reb) rebWaitAck(rargs *rebArgs) (errCnt int) {
for _, lom := range lomack.q {
tsi, err := smap.HrwHash2T(lom.Digest())
if err == nil {
nlog.Infof("waiting for %s ACK from %s", lom, tsi.StringEx())
nlog.Infoln("waiting for", lom.String(), "ACK from", tsi.StringEx())
logged = true
break
}
Expand All @@ -587,7 +587,7 @@ func (reb *Reb) rebWaitAck(rargs *rebArgs) (errCnt int) {
}
lomack.mu.Unlock()
if err := xreb.AbortErr(); err != nil {
nlog.Infof("%s: abort wait-ack (%v)", logHdr, err)
nlog.Infoln(logHdr, "abort wait-ack:", err)
return
}
}
Expand All @@ -597,7 +597,7 @@ func (reb *Reb) rebWaitAck(rargs *rebArgs) (errCnt int) {
}
nlog.Warningf("%s: waiting for %d ACKs", logHdr, cnt)
if err := xreb.AbortedAfter(sleep); err != nil {
nlog.Infof("%s: abort wait-ack (%v)", logHdr, err)
nlog.Infoln(logHdr, "abort wait-ack:", err)
return
}

Expand Down Expand Up @@ -707,10 +707,8 @@ func (reb *Reb) _aborted(rargs *rebArgs) (yes bool) {
}

func (reb *Reb) fini(rargs *rebArgs, logHdr string, err error) {
var stats core.Stats
if cmn.Rom.FastV(4, cos.SmoduleReb) {
nlog.Infof("finishing rebalance (reb_args: %s)", reb.logHdr(rargs.id, rargs.smap))
}
nlog.Infoln(logHdr, "fini")

// prior to closing the streams
if q := reb.quiesce(rargs, rargs.config.Transport.QuiesceTime.D(), reb.nodesQuiescent); q != core.QuiAborted {
if errM := fs.RemoveMarker(fname.RebalanceMarker); errM == nil {
Expand All @@ -720,7 +718,11 @@ func (reb *Reb) fini(rargs *rebArgs, logHdr string, err error) {
}
reb.endStreams(err)
reb.filterGFN.Reset()
xreb := reb.xctn()

var (
stats core.Stats
xreb = reb.xctn()
)
xreb.ToStats(&stats)
if stats.Objs > 0 || stats.OutObjs > 0 || stats.InObjs > 0 {
s, e := jsoniter.MarshalIndent(&stats, "", " ")
Expand All @@ -735,7 +737,7 @@ func (reb *Reb) fini(rargs *rebArgs, logHdr string, err error) {
if !xreb.Finished() {
xreb.Finish()
}
nlog.Infof("%s: done (%s)", logHdr, xreb)
nlog.Infoln(logHdr, "done", xreb.String())
}

//////////////////////////////
Expand Down
4 changes: 2 additions & 2 deletions reb/recv.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (reb *Reb) recvObjRegular(hdr *transport.ObjHdr, smap *meta.Smap, unpacker
return err
}
if ack.rebID != reb.RebID() {
nlog.Warningf("received %s: %s", hdr.Cname(), reb.warnID(ack.rebID, ack.daemonID))
nlog.Warningln("received", hdr.Cname(), reb.warnID(ack.rebID, ack.daemonID))
return nil
}
tsid := ack.daemonID // the sender
Expand Down Expand Up @@ -209,7 +209,7 @@ func (reb *Reb) recvRegularAck(hdr *transport.ObjHdr, unpacker *cos.ByteUnpack)
return err
}
if ack.rebID != reb.rebID.Load() {
nlog.Warningf("ACK from %s: %s", ack.daemonID, reb.warnID(ack.rebID, ack.daemonID))
nlog.Warningln("ACK from", ack.daemonID, reb.warnID(ack.rebID, ack.daemonID))
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion reb/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (reb *Reb) changeStage(newStage uint32) {
hdr.Opaque = reb.encodeStageNtfn(&req)
// second, notify all
if err := reb.pushes.Send(&transport.Obj{Hdr: hdr}, nil); err != nil {
nlog.Warningf("Failed to broadcast ack %s: %v", stages[newStage], err)
nlog.Warningln("Failed to push new-stage notif:", req.rebID, stages[newStage], "err:", err)
}
}

Expand Down
2 changes: 1 addition & 1 deletion transport/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (s *Stream) Send(obj *Obj) (err error) {
}

s.workCh <- obj
if l, c := len(s.workCh), cap(s.workCh); l > c/2 {
if l, c := len(s.workCh), cap(s.workCh); l > (c - c>>2) {
runtime.Gosched() // poor man's throttle
if l == c {
s.chanFull.Inc()
Expand Down
28 changes: 14 additions & 14 deletions transport/base.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Package transport provides long-lived http/tcp connections for
// intra-cluster communications (see README for details and usage example).
/*
* Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.
*/
package transport

Expand Down Expand Up @@ -168,8 +168,8 @@ func (s *streamBase) startSend(streamable fmt.Stringer) (err error) {

if s.sessST.CAS(inactive, active) {
s.postCh <- struct{}{}
if verbose {
nlog.Infof("%s: inactive => active", s)
if cmn.Rom.FastV(5, cos.SmoduleTransport) {
nlog.Infoln(s.String(), "inactive => active")
}
}
return
Expand Down Expand Up @@ -212,21 +212,21 @@ func (s *streamBase) isNextReq() (reason string) {
for {
select {
case <-s.lastCh.Listen():
if verbose {
nlog.Infof("%s: end-of-stream", s)
if cmn.Rom.FastV(5, cos.SmoduleTransport) {
nlog.Infoln(s.String(), "end-of-stream")
}
reason = endOfStream
return
case <-s.stopCh.Listen():
if verbose {
nlog.Infof("%s: stopped", s)
if cmn.Rom.FastV(5, cos.SmoduleTransport) {
nlog.Infoln(s.String(), "stopped")
}
reason = reasonStopped
return
case <-s.postCh:
s.sessST.Store(active)
if verbose {
nlog.Infof("%s: active <- posted", s)
if cmn.Rom.FastV(5, cos.SmoduleTransport) {
nlog.Infoln(s.String(), "active <- posted")
}
return
}
Expand All @@ -235,9 +235,8 @@ func (s *streamBase) isNextReq() (reason string) {

func (s *streamBase) deactivate() (n int, err error) {
err = io.EOF
if verbose {
num := s.stats.Num.Load()
nlog.Infof("%s: connection teardown (%d/%d)", s, s.numCur, num)
if cmn.Rom.FastV(5, cos.SmoduleTransport) {
nlog.Infoln(s.String(), "connection teardown: [", s.numCur, s.stats.Num.Load(), "]")
}
return
}
Expand All @@ -260,7 +259,7 @@ func (s *streamBase) sendLoop(dryrun bool) {
break
}
retried = true
nlog.Errorf("%s: %v - retrying...", s, errR)
nlog.Errorln(s.String(), "err: ", errR, "- retrying...")
time.Sleep(connErrWait)
}
}
Expand Down Expand Up @@ -288,8 +287,9 @@ func (s *streamBase) sendLoop(dryrun bool) {
// cleanup
s.streamer.abortPending(err, false /*completions*/)

verbose := cmn.Rom.FastV(5, cos.SmoduleTransport)
if cnt := s.chanFull.Load(); (cnt >= 10 && cnt <= 20) || (cnt > 0 && verbose) {
nlog.Errorln(cos.ErrWorkChanFull, s.lid, "cnt", cnt)
nlog.Errorln(s.String(), cos.ErrWorkChanFull, "cnt:", cnt)
}
}

Expand Down
19 changes: 16 additions & 3 deletions transport/bundle/dmover.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ func (dm *DataMover) Open() {
}
dm.ack.streams = New(dm.ack.client, ackArgs)
}

dm.stage.opened.Store(true)
nlog.Infoln(dm.String(), "is open")
}

func (dm *DataMover) String() string {
Expand All @@ -179,18 +181,29 @@ func (dm *DataMover) Quiesce(d time.Duration) core.QuiRes {

func (dm *DataMover) Close(err error) {
if dm == nil {
nlog.Errorln("Warning: DM is <nil>")
return
}
if !dm.stage.opened.CAS(true, false) {
nlog.Errorln("Warning:", dm.String(), "not open")
return
}
if err == nil && dm.xctn != nil && dm.xctn.IsAborted() {
err = dm.xctn.AbortErr()
}
// nil: close gracefully via `fin`, otherwise abort
dm.data.streams.Close(err == nil)
if dm.useACKs() {
dm.ack.streams.Close(err == nil)
if err == nil {
dm.data.streams.Close(true)
if dm.useACKs() {
dm.ack.streams.Close(true)
}
nlog.Infoln(dm.String(), "closed")
} else {
dm.data.streams.Close(false)
if dm.useACKs() {
dm.ack.streams.Close(false)
}
nlog.Infoln(dm.String(), "aborted:", err)
}
}

Expand Down
8 changes: 4 additions & 4 deletions transport/client_fasthttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ func (s *streamBase) do(body io.Reader) (err error) {
// do
err = s.client.Do(req, resp)
if err != nil {
if verbose {
nlog.Errorf("%s: Error [%v]", s, err)
if cmn.Rom.FastV(5, cos.SmoduleTransport) {
nlog.Errorln(s.String(), "err:", err)
}
return
return err
}
// handle response & cleanup
resp.BodyWriteTo(io.Discard)
Expand All @@ -87,5 +87,5 @@ func (s *streamBase) do(body io.Reader) (err error) {
if s.streamer.compressed() {
s.streamer.resetCompression()
}
return
return nil
}
4 changes: 2 additions & 2 deletions transport/client_nethttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ func (s *streamBase) do(body io.Reader) (err error) {

response, err = s.client.Do(request)
if err != nil {
if verbose {
nlog.Errorf("%s: Error [%v]", s, err)
if cmn.Rom.FastV(5, cos.SmoduleTransport) {
nlog.Errorln(s.String(), "err:", err)
}
return
}
Expand Down
Loading

0 comments on commit 9dcbfad

Please sign in to comment.