From 11148689394eb1b750ad1947ed6715bf9e415852 Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Mon, 9 Sep 2024 10:34:20 -0400 Subject: [PATCH] [config change] close EC streams when idle, reopen on demand (major) * cluster config: add "ec_streams_time" * proxy: `onEC` when initializing bucket - s3 and, separately, native API * new sources: ais/prxec and ais/tgtec - add target /v1/ec endpint * refactor; reduce copy/paste; remove unused code * miscellaneous micro-optimizations * part two, prev. commit: d8a71bb59fb0a18d Signed-off-by: Alex Aizman --- ais/htrun.go | 7 +- ais/kalive.go | 2 +- ais/proxy.go | 37 ++-- ais/prxbck.go | 38 ++-- ais/prxclu.go | 30 +--- ais/prxec.go | 130 ++++++++++++++ ais/prxnotif.go | 15 +- ais/prxrev.go | 4 +- ais/prxs3.go | 196 ++++++++++++--------- ais/s3redirect.go | 52 ------ ais/target.go | 93 +--------- ais/tgtbck.go | 2 +- ais/tgtcp.go | 4 +- ais/tgtec.go | 87 +++++++++ ais/tgtetl.go | 2 +- ais/tgtimpl.go | 2 +- ais/tgtobj.go | 6 +- api/apc/actmsg.go | 5 + cmd/authn/const.go | 4 +- cmn/config.go | 53 +++--- cmn/rom.go | 8 +- core/meta/bck.go | 12 +- deploy/dev/local/aisnode_config.fspaths.sh | 1 + deploy/dev/local/aisnode_config.sh | 3 +- ec/ec.go | 3 +- ec/intrareq.go | 2 +- ec/manager.go | 53 +++--- ec/metafile.go | 2 +- 28 files changed, 483 insertions(+), 370 deletions(-) create mode 100644 ais/prxec.go delete mode 100644 ais/s3redirect.go create mode 100644 ais/tgtec.go diff --git a/ais/htrun.go b/ais/htrun.go index 431ed2a009..5629520e39 100644 --- a/ais/htrun.go +++ b/ais/htrun.go @@ -624,7 +624,7 @@ func (h *htrun) _call(si *meta.Snode, bargs *bcastArgs, results *bcastResults) { freeCargs(cargs) } -const _callHdrLen = 5 +const lenhdr = 5 func (h *htrun) call(args *callArgs, smap *smapX) (res *callResult) { var ( @@ -686,7 +686,7 @@ func (h *htrun) call(args *callArgs, smap *smapX) (res *callResult) { // req header if args.req.Header == nil { - args.req.Header = make(http.Header, _callHdrLen) + args.req.Header = make(http.Header, lenhdr) } if smap.vstr != "" { if smap.IsPrimary(h.si) { @@ -1956,7 +1956,8 @@ func (h *htrun) fastKalive(smap *smapX, timeout time.Duration, ecActive bool) (p cargs.timeout = timeout } if ecActive { - hdr := make(http.Header, _callHdrLen) + // (target => primary) + hdr := make(http.Header, lenhdr) hdr.Set(apc.HdrActiveEC, "true") cargs.req.Header = hdr } diff --git a/ais/kalive.go b/ais/kalive.go index 2d7a6f3115..530cfc2e5a 100644 --- a/ais/kalive.go +++ b/ais/kalive.go @@ -216,7 +216,7 @@ func (pkr *palive) sendKalive(smap *smapX, timeout time.Duration, now int64, fas // check resp header from primary // (see: _respActiveEC; compare with: _recvActiveEC) if isActiveEC(hdr) { - pkr.p.lastEC.Store(now) + pkr.p.ec.last.Store(now) } } return pid, 0, err diff --git a/ais/proxy.go b/ais/proxy.go index 3179d5189b..edbc03fa94 100644 --- a/ais/proxy.go +++ b/ais/proxy.go @@ -72,8 +72,10 @@ type ( mu sync.RWMutex in atomic.Bool } - lastEC atomic.Int64 // last active EC via apc.HdrActiveEC (mono time) - + ec struct { + last atomic.Int64 // last active EC via apc.HdrActiveEC (mono time) + rust int64 // same as above + } settingNewPrimary atomic.Bool // primary executing "set new primary" request (state) readyToFastKalive atomic.Bool // primary can accept fast keepalives } @@ -224,6 +226,7 @@ func (p *proxy) Run() error { {r: apc.Vote, h: p.voteHandler, net: accessNetIntraControl}, {r: apc.Notifs, h: p.notifs.handler, net: accessNetIntraControl}, + {r: apc.EC, h: p.ecHandler, net: accessNetIntraControl}, // S3 compatibility {r: "/" + apc.S3, h: p.s3Handler, net: accessNetPublic}, @@ -734,6 +737,8 @@ func (p *proxy) httpobjget(w http.ResponseWriter, r *http.Request, origURLBck .. return } + started := time.Now() + // 3. redirect smap := p.owner.smap.get() tsi, netPub, err := smap.HrwMultiHome(bck.MakeUname(objName)) @@ -742,9 +747,10 @@ func (p *proxy) httpobjget(w http.ResponseWriter, r *http.Request, origURLBck .. return } if cmn.Rom.FastV(5, cos.SmoduleAIS) { - nlog.Infoln("GET " + bck.Cname(objName) + " => " + tsi.String()) + nlog.Infoln("GET", bck.Cname(objName), "=>", tsi.StringEx()) } - redirectURL := p.redirectURL(r, tsi, time.Now() /*started*/, cmn.NetIntraData, netPub) + + redirectURL := p.redirectURL(r, tsi, started, cmn.NetIntraData, netPub) http.Redirect(w, r, redirectURL, http.StatusMovedPermanently) // 4. stats @@ -816,14 +822,15 @@ func (p *proxy) httpobjput(w http.ResponseWriter, r *http.Request, apireq *apiRe // verbose if cmn.Rom.FastV(5, cos.SmoduleAIS) { - verb, s := "PUT", "" + verb := "PUT" if appendTyProvided { verb = "APPEND" } + var s string if bck.Props.Mirror.Enabled { s = " (put-mirror)" } - nlog.Infof("%s %s => %s%s", verb, bck.Cname(objName), tsi.StringEx(), s) + nlog.Infoln(verb, bck.Cname(objName), "=>", tsi.StringEx(), s) } redirectURL := p.redirectURL(r, tsi, started, cmn.NetIntraData, netPub) @@ -858,7 +865,7 @@ func (p *proxy) httpobjdelete(w http.ResponseWriter, r *http.Request) { return } if cmn.Rom.FastV(5, cos.SmoduleAIS) { - nlog.Infoln("DELETE " + bck.Cname(objName) + " => " + tsi.StringEx()) + nlog.Infoln("DELETE", bck.Cname(objName), "=>", tsi.StringEx()) } redirectURL := p.redirectURL(r, tsi, time.Now() /*started*/, cmn.NetIntraControl) http.Redirect(w, r, redirectURL, http.StatusTemporaryRedirect) @@ -1457,7 +1464,11 @@ func (p *proxy) initBckTo(w http.ResponseWriter, r *http.Request, query url.Valu bckToArgs.createAIS = true ecode, err := bckToArgs.init() - if err != nil && ecode != http.StatusNotFound { + if err == nil { + return bckTo, 0, p.onEC(bckTo) // compare with `initAndTry` + } + + if ecode != http.StatusNotFound { p.writeErr(w, r, err, ecode) return nil, 0, err } @@ -1697,7 +1708,7 @@ func (p *proxy) lsPage(bck *meta.Bck, amsg *apc.ActMsg, lsmsg *apc.LsoMsg, hdr h if lsmsg.SID != "" { s += " via " + tsi.StringEx() } - nlog.Infof("%s[%s] %s%s", amsg.Action, lsmsg.UUID, bck.Cname(""), s) + nlog.Infoln(amsg.Action, "[", lsmsg.UUID, "]", bck.Cname(""), s) } config := cmn.GCO.Get() @@ -2079,7 +2090,7 @@ func (p *proxy) httpobjhead(w http.ResponseWriter, r *http.Request, origURLBck . return } if cmn.Rom.FastV(5, cos.SmoduleAIS) { - nlog.Infof("%s %s => %s", r.Method, bck.Cname(objName), si.StringEx()) + nlog.Infoln(r.Method, bck.Cname(objName), "=>", si.StringEx()) } redirectURL := p.redirectURL(r, si, time.Now() /*started*/, cmn.NetIntraControl) http.Redirect(w, r, redirectURL, http.StatusTemporaryRedirect) @@ -2107,7 +2118,7 @@ func (p *proxy) httpobjpatch(w http.ResponseWriter, r *http.Request) { return } if cmn.Rom.FastV(5, cos.SmoduleAIS) { - nlog.Infof("%s %s => %s", r.Method, bck.Cname(objName), si.StringEx()) + nlog.Infoln(r.Method, bck.Cname(objName), "=>", si.StringEx()) } redirectURL := p.redirectURL(r, si, started, cmn.NetIntraControl) http.Redirect(w, r, redirectURL, http.StatusTemporaryRedirect) @@ -2409,7 +2420,7 @@ func (p *proxy) redirectObjAction(w http.ResponseWriter, r *http.Request, bck *m return } if cmn.Rom.FastV(5, cos.SmoduleAIS) { - nlog.Infof("%q %s => %s", msg.Action, bck.Cname(objName), si.StringEx()) + nlog.Infoln(msg.Action, bck.Cname(objName), "=>", si.StringEx()) } // NOTE: Code 307 is the only way to http-redirect with the original JSON payload. @@ -3135,7 +3146,7 @@ func (p *proxy) htHandler(w http.ResponseWriter, r *http.Request) { } baseURL := r.URL.Scheme + "://" + r.URL.Host if cmn.Rom.FastV(5, cos.SmoduleAIS) { - nlog.Infof("[HTTP CLOUD] RevProxy handler for: %s -> %s", baseURL, r.URL.Path) + nlog.Infoln("[HTTP CLOUD] RevProxy handler:", baseURL, "-->", r.URL.Path) } if r.Method == http.MethodGet || r.Method == http.MethodHead { // bck.IsHT() diff --git a/ais/prxbck.go b/ais/prxbck.go index 6ebbaff5a6..95a1dbd5c7 100644 --- a/ais/prxbck.go +++ b/ais/prxbck.go @@ -118,8 +118,7 @@ func (bctx *bctx) init() (ecode int, err error) { } if err = bctx.accessSupported(); err != nil { - ecode = http.StatusMethodNotAllowed - return + return http.StatusMethodNotAllowed, err } if bctx.skipBackend { err = bck.InitNoBackend(bctx.p.owner.bmd) @@ -127,11 +126,10 @@ func (bctx *bctx) init() (ecode int, err error) { err = bck.Init(bctx.p.owner.bmd) } if err != nil { - ecode = http.StatusBadRequest if cmn.IsErrBucketNought(err) { - ecode = http.StatusNotFound + return http.StatusNotFound, err } - return + return http.StatusBadRequest, err } bctx.isPresent = true @@ -195,6 +193,7 @@ func (bctx *bctx) initAndTry() (bck *meta.Bck, err error) { // 1. init bucket bck = bctx.bck if ecode, err = bctx.init(); err == nil { + err = bctx.p.onEC(bck) return } if ecode != http.StatusNotFound { @@ -239,7 +238,7 @@ func (bctx *bctx) try() (bck *meta.Bck, err error) { case cmn.IsErrBucketAlreadyExists(err): // e.g., when (re)setting backend two times in a row // TODO: return http.StatusNoContent - nlog.Infoln(bctx.p.String()+":", err, " - nothing to do") + nlog.Infoln(bctx.p.String(), err, " - nothing to do") return bck, nil default: if bctx.perms == apc.AceBckHEAD { @@ -257,13 +256,10 @@ func (bctx *bctx) try() (bck *meta.Bck, err error) { func (bctx *bctx) _try() (bck *meta.Bck, ecode int, err error) { if err = bctx.bck.Validate(); err != nil { - ecode = http.StatusBadRequest - return + return bck, http.StatusBadRequest, err } - if bctx.p.forwardCP(bctx.w, bctx.r, bctx.msg, "add-bucket", bctx.reqBody) { - err = errForwarded - return + return bck, 0, errForwarded } // am primary from this point on @@ -277,8 +273,7 @@ func (bctx *bctx) _try() (bck *meta.Bck, ecode int, err error) { } if bck.IsAIS() { if err = bctx.p.access(bctx.r.Header, nil /*bck*/, apc.AceCreateBucket); err != nil { - ecode = aceErrToCode(err) - return + return bck, aceErrToCode(err), err } nlog.Warningf("%s: %q doesn't exist, proceeding to create", bctx.p, bctx.bck) goto creadd @@ -287,8 +282,7 @@ func (bctx *bctx) _try() (bck *meta.Bck, ecode int, err error) { // lookup remote if remoteHdr, ecode, err = bctx.lookup(bck); err != nil { - bck = nil - return + return nil, ecode, err } // orig-url for the ht:// bucket @@ -301,8 +295,7 @@ func (bctx *bctx) _try() (bck *meta.Bck, ecode int, err error) { origURL = bctx.getOrigURL() ) if origURL == "" { - err = cmn.NewErrFailedTo(bctx.p, "initialize", bctx.bck, errors.New("missing HTTP URL")) - return + return bck, 0, cmn.NewErrFailedTo(bctx.p, "initialize", bctx.bck, errors.New("missing HTTP URL")) } if hbo, err = cmn.NewHTTPObjPath(origURL); err != nil { return @@ -336,15 +329,18 @@ func (bctx *bctx) _try() (bck *meta.Bck, ecode int, err error) { // add/create creadd: if err = bctx.p.createBucket(&apc.ActMsg{Action: action}, bck, remoteHdr); err != nil { - ecode = crerrStatus(err) - return + return bck, crerrStatus(err), err } + // finally, initialize the newly added/created if err = bck.Init(bctx.p.owner.bmd); err != nil { debug.AssertNoErr(err) - ecode = http.StatusInternalServerError - err = cmn.NewErrFailedTo(bctx.p, "post create-bucket init", bck, err, ecode) + return bck, http.StatusInternalServerError, + cmn.NewErrFailedTo(bctx.p, "post create-bucket init", bck, err, ecode) } + + err = bctx.p.onEC(bck) + bck = bctx.bck return } diff --git a/ais/prxclu.go b/ais/prxclu.go index 63ed9c5e45..b2098102e2 100644 --- a/ais/prxclu.go +++ b/ais/prxclu.go @@ -383,7 +383,7 @@ func (p *proxy) httpclupost(w http.ResponseWriter, r *http.Request) { switch apiOp { case apc.Keepalive: - // fast path(?) + // fast path if len(apiItems) > 1 { p.fastKaliveRsp(w, r, smap, config, apiItems[1] /*sid*/) return @@ -2259,31 +2259,3 @@ func mustRebalance(ctx *smapModifier, cur *smapX) bool { } return false } - -// -// (EC is active) and (is EC active?) via fastKalive -// - -// default config.Timeout.EcStreams -const ( - ecTimeout = 10 * time.Minute -) - -func isActiveEC(hdr http.Header) (ok bool) { - _, ok = hdr[apc.HdrActiveEC] - return ok -} - -// (target send => primary) -func (p *proxy) _recvActiveEC(rhdr http.Header, now int64) { - if isActiveEC(rhdr) { - p.lastEC.Store(now) - } -} - -// (primary resp => non-primary) -func (p *proxy) _respActiveEC(whdr http.Header, now int64) { - if time.Duration(now-p.lastEC.Load()) < ecTimeout { - whdr.Set(apc.HdrActiveEC, "true") - } -} diff --git a/ais/prxec.go b/ais/prxec.go new file mode 100644 index 0000000000..a720b5bd2e --- /dev/null +++ b/ais/prxec.go @@ -0,0 +1,130 @@ +// Package ais provides core functionality for the AIStore object storage. +/* + * Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. + */ +package ais + +import ( + "fmt" + "net/http" + "time" + + "github.com/NVIDIA/aistore/api/apc" + "github.com/NVIDIA/aistore/cmn" + "github.com/NVIDIA/aistore/cmn/debug" + "github.com/NVIDIA/aistore/cmn/mono" + "github.com/NVIDIA/aistore/cmn/nlog" + "github.com/NVIDIA/aistore/core/meta" +) + +func (p *proxy) ecHandler(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodPost: + p.httpecpost(w, r) + default: + cmn.WriteErr405(w, r, http.MethodGet) + } +} + +func (p *proxy) httpecpost(w http.ResponseWriter, r *http.Request) { + apiItems, err := p.parseURL(w, r, apc.URLPathEC.L, 1, false) + if err != nil { + return + } + action := apiItems[0] + switch action { + case apc.ActEcOpen: + p._setActiveEC(mono.NanoTime()) + case apc.ActEcClose: + // TODO -- FIXME: niy + default: + p.writeErr(w, r, errActEc(action)) + } +} + +// +// (EC is active) and (is EC active?) via fastKalive +// + +func isActiveEC(hdr http.Header) (ok bool) { + _, ok = hdr[apc.HdrActiveEC] + return ok +} + +// (target send => primary) +func (p *proxy) _recvActiveEC(hdr http.Header, now int64) { + if isActiveEC(hdr) { + p._setActiveEC(now) + } +} + +func (p *proxy) _setActiveEC(now int64) { + p.ec.last.Store(now) + p.ec.rust = now +} + +// (primary resp => non-primary) +func (p *proxy) _respActiveEC(hdr http.Header, now int64) { + tout := cmn.Rom.EcStreams() + if time.Duration(now-p.ec.last.Load()) < tout { + hdr.Set(apc.HdrActiveEC, "true") + } +} + +const ecStreamsNack = max(cmn.EcStreamsMini>>1, 3*time.Minute) + +func (p *proxy) onEC(bck *meta.Bck) error { + if !bck.Props.EC.Enabled || cmn.Rom.EcStreams() < 0 /* cmn.EcStreamsEver */ { + return nil + } + now := mono.NanoTime() + debug.Assert(cmn.Rom.EcStreams() >= cmn.EcStreamsMini, cmn.Rom.EcStreams(), " vs ", cmn.EcStreamsMini) + if time.Duration(now-p.ec.rust) < ecStreamsNack { + return nil + } + return p._onEC(now) +} + +func (p *proxy) _onEC(now int64) error { + last := p.ec.last.Load() + if time.Duration(now-last) < ecStreamsNack { + return nil + } + + // 1. targets + args := allocBcArgs() + { + args.smap = p.owner.smap.get() + args.req = cmn.HreqArgs{Method: http.MethodPost, Path: apc.URLPathEC.Join(apc.ActEcOpen)} + args.network = cmn.NetIntraControl + args.timeout = cmn.Rom.CplaneOperation() + args.nodes = []meta.NodeMap{args.smap.Tmap} + args.nodeCount = len(args.smap.Tmap) + } + results := p.bcastNodes(args) + + for _, res := range results { + if res.err != nil { + freeBcArgs(args) + return fmt.Errorf("%s: %s failed to open EC streams: %v", p, res.si.StringEx(), res.err) + } + } + + // 2. proxies, upon success + if args.nodeCount = len(args.smap.Pmap) - 1; args.nodeCount == 0 { + goto ex + } + args.nodes = []meta.NodeMap{args.smap.Pmap} + freeBcastRes(results) + results = p.bcastNodes(args) + for _, res := range results { + if res.err != nil { + nlog.Warningf("%s: %s failed to get notified: %v", p, res.si.StringEx(), res.err) + } + } + +ex: + freeBcArgs(args) + freeBcastRes(results) + return nil +} diff --git a/ais/prxnotif.go b/ais/prxnotif.go index 1ba19307da..8cb3238710 100644 --- a/ais/prxnotif.go +++ b/ais/prxnotif.go @@ -193,7 +193,7 @@ func (n *notifs) _finished(nl nl.Listener, tsi *meta.Snode, msg *core.NotifMsg) nl.SetStats(tsi.ID(), stats) if abortedSnap != msg.AbortedX && cmn.Rom.FastV(4, cos.SmoduleAIS) { - nlog.Infof("Warning: %s: %t vs %t [%s]", msg, abortedSnap, msg.AbortedX, nl.String()) + nlog.Infoln("Warning:", msg.String(), "aborted", abortedSnap, "vs", msg.AbortedX, nl.String()) } aborted = aborted || abortedSnap } @@ -430,7 +430,7 @@ func (n *notifs) bcastGetStats(nl nl.Listener, dur time.Duration) { done = done || n.markFinished(nl, res.si, err, true) // NOTE: not-found at one ==> all done nl.Unlock() } else if cmn.Rom.FastV(4, cos.SmoduleAIS) { - nlog.Errorf("%s: %s, node %s: %v", n.p, nl, res.si.StringEx(), res.unwrap()) + nlog.Errorln(n.p.String(), nl.String(), "node", res.si.StringEx(), res.unwrap()) } } freeBcastRes(results) @@ -463,14 +463,17 @@ func (n *notifs) ListenSmapChanged() { return } var ( - remnl = make(map[string]nl.Listener) - remid = make(cos.StrKVs) + remnl map[string]nl.Listener + remid cos.StrKVs ) n.nls.mtx.RLock() for uuid, nl := range n.nls.m { nl.RLock() for sid := range nl.ActiveNotifiers() { if node := smap.GetActiveNode(sid); node == nil { + if remnl == nil { + remnl, remid = _remini() + } remnl[uuid] = nl remid[uuid] = sid break @@ -523,6 +526,10 @@ repeat: clear(remid) } +func _remini() (map[string]nl.Listener, cos.StrKVs) { + return make(map[string]nl.Listener, 1), make(cos.StrKVs, 1) +} + func (n *notifs) MarshalJSON() ([]byte, error) { if n.size() == 0 { return nil, nil diff --git a/ais/prxrev.go b/ais/prxrev.go index 5bd548cc51..501e32e581 100644 --- a/ais/prxrev.go +++ b/ais/prxrev.go @@ -98,9 +98,9 @@ func (p *proxy) forwardCP(w http.ResponseWriter, r *http.Request, msg *apc.ActMs if cmn.Rom.FastV(5, cos.SmoduleAIS) { pname := smap.Primary.StringEx() if msg != nil { - nlog.Infof("%s: forwarding \"%s:%s\" to the primary %s", p, msg.Action, s, pname) + nlog.Infoln(p.String(), "forwarding [", msg.Action, s, "] to the primary", pname) } else { - nlog.Infof("%s: forwarding %q to the primary %s", p, s, pname) + nlog.Infoln(p.String(), "forwarding [", s, "] to the primary", pname) } } primary.rp.ServeHTTP(w, r) diff --git a/ais/prxs3.go b/ais/prxs3.go index 9d72b05d97..ffc23245b9 100644 --- a/ais/prxs3.go +++ b/ais/prxs3.go @@ -18,6 +18,7 @@ import ( "github.com/NVIDIA/aistore/cmn" "github.com/NVIDIA/aistore/cmn/cos" "github.com/NVIDIA/aistore/cmn/debug" + "github.com/NVIDIA/aistore/cmn/feat" "github.com/NVIDIA/aistore/cmn/mono" "github.com/NVIDIA/aistore/cmn/nlog" "github.com/NVIDIA/aistore/core/meta" @@ -175,9 +176,8 @@ func (p *proxy) putBckS3(w http.ResponseWriter, r *http.Request, bucket string) // DELETE /s3/ (TODO: AWS allows to delete bucket only if it is empty) func (p *proxy) delBckS3(w http.ResponseWriter, r *http.Request, bucket string) { - bck, err, ecode := meta.InitByNameOnly(bucket, p.owner.bmd) - if err != nil { - s3.WriteErr(w, r, err, ecode) + bck := p.initByNameOnly(w, r, bucket) + if bck == nil { return } if err := bck.Allow(apc.AceDestroyBucket); err != nil { @@ -200,22 +200,21 @@ func (p *proxy) delBckS3(w http.ResponseWriter, r *http.Request, bucket string) } func (p *proxy) handleMptUpload(w http.ResponseWriter, r *http.Request, parts []string) { - bucket := parts[0] - bck, err, ecode := meta.InitByNameOnly(bucket, p.owner.bmd) - if err != nil { - s3.WriteErr(w, r, err, ecode) + bck := p.initByNameOnly(w, r, parts[0] /*bucket*/) + if bck == nil { return } if err := bck.Allow(apc.AcePUT); err != nil { s3.WriteErr(w, r, err, http.StatusForbidden) return } - smap := p.owner.smap.get() objName := s3.ObjName(parts) if err := cmn.ValidateObjName(objName); err != nil { s3.WriteErr(w, r, err, 0) return } + + smap := p.owner.smap.get() si, netPub, err := smap.HrwMultiHome(bck.MakeUname(objName)) if err != nil { s3.WriteErr(w, r, err, 0) @@ -229,9 +228,8 @@ func (p *proxy) handleMptUpload(w http.ResponseWriter, r *http.Request, parts [] // DELETE /s3/i?delete // Delete a list of objects func (p *proxy) delMultipleObjs(w http.ResponseWriter, r *http.Request, bucket string) { - bck, err, ecode := meta.InitByNameOnly(bucket, p.owner.bmd) - if err != nil { - s3.WriteErr(w, r, err, ecode) + bck := p.initByNameOnly(w, r, bucket) + if bck == nil { return } if err := bck.Allow(apc.AceObjDELETE); err != nil { @@ -292,9 +290,8 @@ func (p *proxy) delMultipleObjs(w http.ResponseWriter, r *http.Request, bucket s // HEAD /s3/ func (p *proxy) headBckS3(w http.ResponseWriter, r *http.Request, bucket string) { - bck, err, ecode := meta.InitByNameOnly(bucket, p.owner.bmd) - if err != nil { - s3.WriteErr(w, r, err, ecode) + bck := p.initByNameOnly(w, r, bucket) + if bck == nil { return } if err := bck.Allow(apc.AceBckHEAD); err != nil { @@ -317,9 +314,8 @@ func (p *proxy) headBckS3(w http.ResponseWriter, r *http.Request, bucket string) // GET /s3/ // https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html func (p *proxy) listObjectsS3(w http.ResponseWriter, r *http.Request, bucket string, q url.Values) { - bck, err, ecode := meta.InitByNameOnly(bucket, p.owner.bmd) - if err != nil { - s3.WriteErr(w, r, err, ecode) + bck := p.initByNameOnly(w, r, bucket) + if bck == nil { return } amsg := &apc.ActMsg{Action: apc.ActList} @@ -431,9 +427,8 @@ func (p *proxy) copyObjS3(w http.ResponseWriter, r *http.Request, items []string return } // src - bckSrc, err, ecode := meta.InitByNameOnly(parts[0], p.owner.bmd) - if err != nil { - s3.WriteErr(w, r, err, ecode) + bckSrc := p.initByNameOnly(w, r, parts[0]) + if bckSrc == nil { return } if err := bckSrc.Allow(apc.AceGET); err != nil { @@ -441,27 +436,24 @@ func (p *proxy) copyObjS3(w http.ResponseWriter, r *http.Request, items []string return } // dst - bckDst, err, ecode := meta.InitByNameOnly(items[0], p.owner.bmd) - if err != nil { - s3.WriteErr(w, r, err, ecode) + bckDst := p.initByNameOnly(w, r, items[0]) + if bckDst == nil { return } - var ( - si *meta.Snode - smap = p.owner.smap.get() - ) - if err = bckDst.Allow(apc.AcePUT); err != nil { + if err := bckDst.Allow(apc.AcePUT); err != nil { s3.WriteErr(w, r, err, http.StatusForbidden) return } + objName := strings.Trim(parts[1], "/") - si, err = smap.HrwName2T(bckSrc.MakeUname(objName)) + smap := p.owner.smap.get() + si, err := smap.HrwName2T(bckSrc.MakeUname(objName)) if err != nil { s3.WriteErr(w, r, err, 0) return } if cmn.Rom.FastV(5, cos.SmoduleS3) { - nlog.Infof("COPY: %s %s => %s/%v %s", r.Method, bckSrc.Cname(objName), bckDst.Cname(""), items, si) + nlog.Infoln("COPY:", r.Method, bckSrc.Cname(objName), "=>", bckDst.Cname(""), items, si.StringEx()) } started := time.Now() redirectURL := p.redirectURL(r, si, started, cmn.NetIntraControl) @@ -471,18 +463,11 @@ func (p *proxy) copyObjS3(w http.ResponseWriter, r *http.Request, items []string // PUT /s3// - with empty `cos.S3HdrObjSrc` // (compare with p.copyObjS3) func (p *proxy) directPutObjS3(w http.ResponseWriter, r *http.Request, items []string) { - bucket := items[0] - bck, err, ecode := meta.InitByNameOnly(bucket, p.owner.bmd) - if err != nil { - s3.WriteErr(w, r, err, ecode) + bck := p.initByNameOnly(w, r, items[0] /*bucket*/) + if bck == nil { return } - var ( - netPub string - si *meta.Snode - smap = p.owner.smap.get() - ) - if err = bck.Allow(apc.AcePUT); err != nil { + if err := bck.Allow(apc.AcePUT); err != nil { s3.WriteErr(w, r, err, http.StatusForbidden) return } @@ -495,34 +480,29 @@ func (p *proxy) directPutObjS3(w http.ResponseWriter, r *http.Request, items []s s3.WriteErr(w, r, err, 0) return } - si, netPub, err = smap.HrwMultiHome(bck.MakeUname(objName)) + + smap := p.owner.smap.get() + si, netPub, err := smap.HrwMultiHome(bck.MakeUname(objName)) if err != nil { s3.WriteErr(w, r, err, 0) return } if cmn.Rom.FastV(5, cos.SmoduleS3) { - nlog.Infof("%s %s => %s", r.Method, bck.Cname(objName), si) + nlog.Infoln(r.Method, bck.Cname(objName), "=>", si.StringEx()) } started := time.Now() + redirectURL := p.redirectURL(r, si, started, cmn.NetIntraData, netPub) p.s3Redirect(w, r, si, redirectURL, bck.Name) } // GET /s3// func (p *proxy) getObjS3(w http.ResponseWriter, r *http.Request, items []string, q url.Values, listMultipart bool) { - bucket := items[0] - - bck, err, ecode := meta.InitByNameOnly(bucket, p.owner.bmd) - if err != nil { - s3.WriteErr(w, r, err, ecode) + bck := p.initByNameOnly(w, r, items[0] /*bucket*/) + if bck == nil { return } - var ( - si *meta.Snode - netPub string - smap = p.owner.smap.get() - ) - if err = bck.Allow(apc.AceGET); err != nil { + if err := bck.Allow(apc.AceGET); err != nil { s3.WriteErr(w, r, err, http.StatusForbidden) return } @@ -539,15 +519,18 @@ func (p *proxy) getObjS3(w http.ResponseWriter, r *http.Request, items []string, s3.WriteErr(w, r, err, 0) return } - si, netPub, err = smap.HrwMultiHome(bck.MakeUname(objName)) + + smap := p.owner.smap.get() + si, netPub, err := smap.HrwMultiHome(bck.MakeUname(objName)) if err != nil { s3.WriteErr(w, r, err, 0) return } if cmn.Rom.FastV(5, cos.SmoduleS3) { - nlog.Infof("%s %s => %s", r.Method, bck.Cname(objName), si) + nlog.Infoln(r.Method, bck.Cname(objName), "=>", si.StringEx()) } started := time.Now() + redirectURL := p.redirectURL(r, si, started, cmn.NetIntraData, netPub) p.s3Redirect(w, r, si, redirectURL, bck.Name) } @@ -605,20 +588,19 @@ func (p *proxy) headObjS3(w http.ResponseWriter, r *http.Request, items []string s3.WriteErr(w, r, errS3Obj, 0) return } - bucket, objName := items[0], s3.ObjName(items) - if err := cmn.ValidateObjName(objName); err != nil { - s3.WriteErr(w, r, err, 0) - return - } - bck, err, ecode := meta.InitByNameOnly(bucket, p.owner.bmd) - if err != nil { - s3.WriteErr(w, r, err, ecode) + bck := p.initByNameOnly(w, r, items[0] /*bucket*/) + if bck == nil { return } if err := bck.Allow(apc.AceObjHEAD); err != nil { s3.WriteErr(w, r, err, http.StatusForbidden) return } + objName := s3.ObjName(items) + if err := cmn.ValidateObjName(objName); err != nil { + s3.WriteErr(w, r, err, 0) + return + } smap := p.owner.smap.get() si, err := smap.HrwName2T(bck.MakeUname(objName)) if err != nil { @@ -626,7 +608,7 @@ func (p *proxy) headObjS3(w http.ResponseWriter, r *http.Request, items []string return } if cmn.Rom.FastV(5, cos.SmoduleS3) { - nlog.Infof("%s %s => %s", r.Method, bck.Cname(objName), si) + nlog.Infoln(r.Method, bck.Cname(objName), "=>", si.StringEx()) } p.reverseNodeRequest(w, r, si) @@ -634,22 +616,16 @@ func (p *proxy) headObjS3(w http.ResponseWriter, r *http.Request, items []string // DELETE /s3// func (p *proxy) delObjS3(w http.ResponseWriter, r *http.Request, items []string) { - bucket := items[0] - bck, err, ecode := meta.InitByNameOnly(bucket, p.owner.bmd) - if err != nil { - s3.WriteErr(w, r, err, ecode) + if len(items) < 2 { + s3.WriteErr(w, r, errS3Obj, 0) return } - var ( - si *meta.Snode - smap = p.owner.smap.get() - ) - if err = bck.Allow(apc.AceObjDELETE); err != nil { - s3.WriteErr(w, r, err, http.StatusForbidden) + bck := p.initByNameOnly(w, r, items[0] /*bucket*/) + if bck == nil { return } - if len(items) < 2 { - s3.WriteErr(w, r, errS3Obj, 0) + if err := bck.Allow(apc.AceObjDELETE); err != nil { + s3.WriteErr(w, r, err, http.StatusForbidden) return } objName := s3.ObjName(items) @@ -657,13 +633,15 @@ func (p *proxy) delObjS3(w http.ResponseWriter, r *http.Request, items []string) s3.WriteErr(w, r, err, 0) return } - si, err = smap.HrwName2T(bck.MakeUname(objName)) + + smap := p.owner.smap.get() + si, err := smap.HrwName2T(bck.MakeUname(objName)) if err != nil { s3.WriteErr(w, r, err, 0) return } if cmn.Rom.FastV(5, cos.SmoduleS3) { - nlog.Infof("%s %s => %s", r.Method, bck.Cname(objName), si) + nlog.Infoln(r.Method, bck.Cname(objName), "=>", si.StringEx()) } started := time.Now() redirectURL := p.redirectURL(r, si, started, cmn.NetIntraControl) @@ -672,9 +650,8 @@ func (p *proxy) delObjS3(w http.ResponseWriter, r *http.Request, items []string) // GET /s3/?versioning func (p *proxy) getBckVersioningS3(w http.ResponseWriter, r *http.Request, bucket string) { - bck, err, ecode := meta.InitByNameOnly(bucket, p.owner.bmd) - if err != nil { - s3.WriteErr(w, r, err, ecode) + bck := p.initByNameOnly(w, r, bucket) + if bck == nil { return } resp := s3.NewVersioningConfiguration(bck.Props.Versioning.Enabled) @@ -700,9 +677,8 @@ func (p *proxy) putBckVersioningS3(w http.ResponseWriter, r *http.Request, bucke if p.forwardCP(w, r, nil, msg.Action+"-"+bucket) { return } - bck, err, ecode := meta.InitByNameOnly(bucket, p.owner.bmd) - if err != nil { - s3.WriteErr(w, r, err, ecode) + bck := p.initByNameOnly(w, r, bucket) + if bck == nil { return } decoder := xml.NewDecoder(r.Body) @@ -725,3 +701,55 @@ func (p *proxy) putBckVersioningS3(w http.ResponseWriter, r *http.Request, bucke s3.WriteErr(w, r, err, 0) } } + +// +// misc. utils +// + +func (p *proxy) initByNameOnly(w http.ResponseWriter, r *http.Request, bucket string) *meta.Bck { + bck, err, ecode := meta.InitByNameOnly(bucket, p.owner.bmd) + if err != nil { + s3.WriteErr(w, r, err, ecode) + return nil + } + if err = p.onEC(bck); err != nil { + s3.WriteErr(w, r, err, 0) + return nil + } + return bck +} + +// either reverse-proxy call _or_ HTTP-redirect to a designated node +// see also: docs/s3compat.md +func (p *proxy) s3Redirect(w http.ResponseWriter, r *http.Request, si *meta.Snode, redirectURL, bucket string) { + if cmn.Rom.Features().IsSet(feat.S3ReverseProxy) { + p.reverseNodeRequest(w, r, si) + return + } + + h := w.Header() + h.Set(cos.HdrLocation, redirectURL) + h.Set(cos.HdrContentType, "text/xml; charset=utf-8") + h.Set(cos.HdrServer, s3.AISServer) + w.WriteHeader(http.StatusTemporaryRedirect) + ep := extractEndpoint(redirectURL) + body := "" + + "TemporaryRedirectRedirect" + + "" + ep + "" + + "" + bucket + "" + fmt.Fprint(w, body) +} + +// extractEndpoint extracts an S3 endpoint from the full URL path. +// Endpoint is a host name with port and root URL path (if exists). +// E.g. for AIS `http://localhost:8080/s3/bck1/obj1` the endpoint +// would be `localhost:8080/s3` +func extractEndpoint(path string) string { + ep := path + if idx := strings.Index(ep, "/"+apc.S3); idx > 0 { + ep = ep[:idx+3] + } + ep = strings.TrimPrefix(ep, "http://") + ep = strings.TrimPrefix(ep, "https://") + return ep +} diff --git a/ais/s3redirect.go b/ais/s3redirect.go deleted file mode 100644 index 681ceb2e21..0000000000 --- a/ais/s3redirect.go +++ /dev/null @@ -1,52 +0,0 @@ -// Package ais provides core functionality for the AIStore object storage. -/* - * Copyright (c) 2021-2024, NVIDIA CORPORATION. All rights reserved. - */ -package ais - -import ( - "fmt" - "net/http" - "strings" - - "github.com/NVIDIA/aistore/ais/s3" - "github.com/NVIDIA/aistore/api/apc" - "github.com/NVIDIA/aistore/cmn" - "github.com/NVIDIA/aistore/cmn/cos" - "github.com/NVIDIA/aistore/cmn/feat" - "github.com/NVIDIA/aistore/core/meta" -) - -// s3Redirect performs reverse proxy call or HTTP-redirects to a designated node -// in a cluster based on feature flag. See also: docs/s3compat.md -func (p *proxy) s3Redirect(w http.ResponseWriter, r *http.Request, si *meta.Snode, redirectURL, bucket string) { - if cmn.GCO.Get().Features.IsSet(feat.S3ReverseProxy) { - p.reverseNodeRequest(w, r, si) - } else { - h := w.Header() - h.Set(cos.HdrLocation, redirectURL) - h.Set(cos.HdrContentType, "text/xml; charset=utf-8") - h.Set(cos.HdrServer, s3.AISServer) - w.WriteHeader(http.StatusTemporaryRedirect) - ep := extractEndpoint(redirectURL) - body := "" + - "TemporaryRedirectRedirect" + - "" + ep + "" + - "" + bucket + "" - fmt.Fprint(w, body) - } -} - -// extractEndpoint extracts an S3 endpoint from the full URL path. -// Endpoint is a host name with port and root URL path (if exists). -// E.g. for AIS `http://localhost:8080/s3/bck1/obj1` the endpoint -// would be `localhost:8080/s3` -func extractEndpoint(path string) string { - ep := path - if idx := strings.Index(ep, "/"+apc.S3); idx > 0 { - ep = ep[:idx+3] - } - ep = strings.TrimPrefix(ep, "http://") - ep = strings.TrimPrefix(ep, "https://") - return ep -} diff --git a/ais/target.go b/ais/target.go index 014c5df78d..c4223dc6da 100644 --- a/ais/target.go +++ b/ais/target.go @@ -8,7 +8,6 @@ import ( "context" "errors" "fmt" - "io" "net" "net/http" "net/url" @@ -485,7 +484,7 @@ func (t *target) initRecvHandlers() { {r: apc.Metasync, h: t.metasyncHandler, net: accessNetIntraControl}, {r: apc.Health, h: t.healthHandler, net: accessNetPublicControl}, {r: apc.Xactions, h: t.xactHandler, net: accessNetIntraControl}, - {r: apc.EC, h: t.ecHandler, net: accessNetIntraData}, + {r: apc.EC, h: t.ecHandler, net: accessNetIntraControl}, {r: apc.Vote, h: t.voteHandler, net: accessNetIntraControl}, {r: apc.Txn, h: t.txnHandler, net: accessNetIntraControl}, {r: apc.ObjStream, h: transport.RxAnyStream, net: accessControlData}, @@ -639,17 +638,6 @@ func (t *target) objectHandler(w http.ResponseWriter, r *http.Request) { } } -// verb /v1/slices -// Non-public inerface -func (t *target) ecHandler(w http.ResponseWriter, r *http.Request) { - switch r.Method { - case http.MethodGet: - t.httpecget(w, r) - default: - cmn.WriteErr405(w, r, http.MethodGet) - } -} - // // httpobj* handlers // @@ -1216,85 +1204,6 @@ func (t *target) httpobjpatch(w http.ResponseWriter, r *http.Request, apireq *ap lom.Persist() } -// -// httpec* handlers -// - -// Returns a slice. Does not use GFN. -func (t *target) httpecget(w http.ResponseWriter, r *http.Request) { - apireq := apiReqAlloc(3, apc.URLPathEC.L, false) - apireq.bckIdx = 1 - if err := t.parseReq(w, r, apireq); err != nil { - apiReqFree(apireq) - return - } - switch apireq.items[0] { - case ec.URLMeta: - t.sendECMetafile(w, r, apireq.bck, apireq.items[2]) - case ec.URLCT: - lom := core.AllocLOM(apireq.items[2]) - t.sendECCT(w, r, apireq.bck, lom) - core.FreeLOM(lom) - default: - t.writeErrURL(w, r) - } - apiReqFree(apireq) -} - -// Returns a CT's metadata. -func (t *target) sendECMetafile(w http.ResponseWriter, r *http.Request, bck *meta.Bck, objName string) { - if err := bck.Init(t.owner.bmd); err != nil { - if !cmn.IsErrRemoteBckNotFound(err) { // is ais - t.writeErr(w, r, err, Silent) - return - } - } - md, err := ec.ObjectMetadata(bck, objName) - if err != nil { - if os.IsNotExist(err) { - t.writeErr(w, r, err, http.StatusNotFound, Silent) - } else { - t.writeErr(w, r, err, http.StatusInternalServerError, Silent) - } - return - } - b := md.NewPack() - w.Header().Set(cos.HdrContentLength, strconv.Itoa(len(b))) - w.Write(b) -} - -func (t *target) sendECCT(w http.ResponseWriter, r *http.Request, bck *meta.Bck, lom *core.LOM) { - if err := lom.InitBck(bck.Bucket()); err != nil { - if cmn.IsErrRemoteBckNotFound(err) { - t.BMDVersionFixup(r) - err = lom.InitBck(bck.Bucket()) - } - if err != nil { - t.writeErr(w, r, err) - return - } - } - sliceFQN := lom.Mountpath().MakePathFQN(bck.Bucket(), fs.ECSliceType, lom.ObjName) - finfo, err := os.Stat(sliceFQN) - if err != nil { - t.writeErr(w, r, err, http.StatusNotFound, Silent) - return - } - file, err := os.Open(sliceFQN) - if err != nil { - t.FSHC(err, lom.Mountpath(), sliceFQN) - t.writeErr(w, r, err, http.StatusInternalServerError) - return - } - - w.Header().Set(cos.HdrContentLength, strconv.FormatInt(finfo.Size(), 10)) - _, err = io.Copy(w, file) // No need for `io.CopyBuffer` as `sendfile` syscall will be used. - cos.Close(file) - if err != nil { - nlog.Errorf("Failed to send slice %s: %v", lom.Cname(), err) - } -} - // called under lock func (t *target) putApndArch(r *http.Request, lom *core.LOM, started int64, dpq *dpq) (int, error) { var ( diff --git a/ais/tgtbck.go b/ais/tgtbck.go index d707fc3df3..85dde80038 100644 --- a/ais/tgtbck.go +++ b/ais/tgtbck.go @@ -486,7 +486,7 @@ func (t *target) httpbckhead(w http.ResponseWriter, r *http.Request, apireq *api } if cmn.Rom.FastV(5, cos.SmoduleAIS) { pid := apireq.query.Get(apc.QparamProxyID) - nlog.Infof("%s %s <= %s", r.Method, apireq.bck, pid) + nlog.Infoln(r.Method, apireq.bck, "<=", pid) } debug.Assert(!apireq.bck.IsAIS()) diff --git a/ais/tgtcp.go b/ais/tgtcp.go index 856599c325..4e3df5d2d4 100644 --- a/ais/tgtcp.go +++ b/ais/tgtcp.go @@ -1170,7 +1170,7 @@ func (t *target) metasyncPost(w http.ResponseWriter, r *http.Request) { } ntid := msg.UUID if cmn.Rom.FastV(4, cos.SmoduleAIS) { - nlog.Infof("%s %s: %s, join %s", t, msg, newSmap, meta.Tname(ntid)) // "start-gfn" | "stop-gfn" + nlog.Infoln(t.String(), msg.String(), newSmap.String(), "join", meta.Tname(ntid)) // "start-gfn" | "stop-gfn" } switch msg.Action { case apc.ActStartGFN: @@ -1188,7 +1188,7 @@ func (t *target) metasyncPost(w http.ResponseWriter, r *http.Request) { func (t *target) healthHandler(w http.ResponseWriter, r *http.Request) { if t.regstate.disabled.Load() && daemon.cli.target.standby { if cmn.Rom.FastV(4, cos.SmoduleAIS) { - nlog.Warningf("[health] %s: standing by...", t) + nlog.Warningln("[health]", t.String(), "standing by...") } } else if !t.NodeStarted() { w.WriteHeader(http.StatusServiceUnavailable) diff --git a/ais/tgtec.go b/ais/tgtec.go new file mode 100644 index 0000000000..d2e9b15fb0 --- /dev/null +++ b/ais/tgtec.go @@ -0,0 +1,87 @@ +// Package ais provides core functionality for the AIStore object storage. +/* + * Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. + */ +package ais + +import ( + "fmt" + "net/http" + "os" + "strconv" + + "github.com/NVIDIA/aistore/api/apc" + "github.com/NVIDIA/aistore/cmn" + "github.com/NVIDIA/aistore/cmn/cos" + "github.com/NVIDIA/aistore/core/meta" + "github.com/NVIDIA/aistore/ec" +) + +func (t *target) ecHandler(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodGet: + t.httpecget(w, r) + case http.MethodPost: + t.httpecpost(w, r) + default: + cmn.WriteErr405(w, r, http.MethodGet) + } +} + +func (t *target) httpecget(w http.ResponseWriter, r *http.Request) { + apireq := apiReqAlloc(3, apc.URLPathEC.L, false) + apireq.bckIdx = 1 + if err := t.parseReq(w, r, apireq); err != nil { + apiReqFree(apireq) + return + } + switch apireq.items[0] { + case ec.URLMeta: + t.sendECMetafile(w, r, apireq.bck, apireq.items[2]) + default: + t.writeErrURL(w, r) + } + apiReqFree(apireq) +} + +// Returns a CT's metadata. +func (t *target) sendECMetafile(w http.ResponseWriter, r *http.Request, bck *meta.Bck, objName string) { + if err := bck.Init(t.owner.bmd); err != nil { + if !cmn.IsErrRemoteBckNotFound(err) { // is ais + t.writeErr(w, r, err, Silent) + return + } + } + md, err := ec.ObjectMetadata(bck, objName) + if err != nil { + if os.IsNotExist(err) { + t.writeErr(w, r, err, http.StatusNotFound, Silent) + } else { + t.writeErr(w, r, err, http.StatusInternalServerError, Silent) + } + return + } + b := md.NewPack() + w.Header().Set(cos.HdrContentLength, strconv.Itoa(len(b))) + w.Write(b) +} + +func (t *target) httpecpost(w http.ResponseWriter, r *http.Request) { + apiItems, err := t.parseURL(w, r, apc.URLPathEC.L, 1, false) + if err != nil { + return + } + action := apiItems[0] + switch action { + case apc.ActEcOpen: + ec.ECM.OpenStreams() + case apc.ActEcClose: + ec.ECM.CloseStreams() + default: + t.writeErr(w, r, errActEc(action)) + } +} + +func errActEc(act string) error { + return fmt.Errorf(fmtErrInvaldAction, act, []string{apc.ActEcOpen, apc.ActEcClose}) +} diff --git a/ais/tgtetl.go b/ais/tgtetl.go index 46f216b582..5e723934e6 100644 --- a/ais/tgtetl.go +++ b/ais/tgtetl.go @@ -81,7 +81,7 @@ func (t *target) handleETLPut(w http.ResponseWriter, r *http.Request) { return } if cmn.Rom.FastV(4, cos.SmoduleETL) { - nlog.Infoln(t.String() + ": " + initMsg.String()) + nlog.Infoln(t.String(), initMsg.String()) } } diff --git a/ais/tgtimpl.go b/ais/tgtimpl.go index 4db955a663..85ef252a0b 100644 --- a/ais/tgtimpl.go +++ b/ais/tgtimpl.go @@ -154,7 +154,7 @@ func (t *target) GetCold(ctx context.Context, lom *core.LOM, owt cmn.OWT) (ecode if owt == cmn.OwtGetTryLock { if !lom.TryLock(true) { if cmn.Rom.FastV(4, cos.SmoduleAIS) { - nlog.Warningf("%s: %s(%s) is busy", t, lom, owt) + nlog.Warningln(t.String(), lom.String(), owt.String(), "is busy") } return 0, cmn.ErrSkip // e.g. prefetch can skip it and keep on going } diff --git a/ais/tgtobj.go b/ais/tgtobj.go index 443471e753..02485230c0 100644 --- a/ais/tgtobj.go +++ b/ais/tgtobj.go @@ -175,7 +175,7 @@ func (poi *putOI) putObject() (ecode int, err error) { if !poi.skipVC && !poi.coldGET && !poi.cksumToUse.IsEmpty() { if poi.lom.EqCksum(poi.cksumToUse) { if cmn.Rom.FastV(4, cos.SmoduleAIS) { - nlog.Infof("destination %s has identical %s: PUT is a no-op", poi.lom, poi.cksumToUse) + nlog.Infoln(poi.lom.String(), "has identical", poi.cksumToUse.String(), "- PUT is a no-op") } cos.DrainReader(poi.r) return 0, nil @@ -984,7 +984,7 @@ func (goi *getOI) getFromNeighbor(lom *core.LOM, tsi *meta.Snode) bool { freePOI(poi) if erp == nil { if cmn.Rom.FastV(5, cos.SmoduleAIS) { - nlog.Infof("%s: gfn %s <= %s", goi.t, goi.lom, tsi) + nlog.Infoln(goi.t.String(), "gfn", goi.lom.String(), "<=", tsi.StringEx()) } return true } @@ -1342,7 +1342,7 @@ func (a *apndOI) apnd(buf []byte) (packedHdl string, ecode int, err error) { cos.NamedVal64{Name: stats.AppendLatency, Value: lat}, ) if cmn.Rom.FastV(4, cos.SmoduleAIS) { - nlog.Infof("APPEND %s: %s", a.lom, lat) + nlog.Infoln("APPEND", a.lom.String(), time.Duration(lat)) } return } diff --git a/api/apc/actmsg.go b/api/apc/actmsg.go index 40f96cf238..02d1fa498b 100644 --- a/api/apc/actmsg.go +++ b/api/apc/actmsg.go @@ -143,6 +143,11 @@ const ( NodeDecommission = "decommission" ) +const ( + ActEcOpen = "open-ec-streams" + ActEcClose = "close-ec-streams" +) + // ActMsg is a JSON-formatted control structures used in a majority of API calls type ( ActMsg struct { diff --git a/cmd/authn/const.go b/cmd/authn/const.go index 4672f8f07a..048472c78c 100644 --- a/cmd/authn/const.go +++ b/cmd/authn/const.go @@ -21,5 +21,7 @@ const ( adminUserID = "admin" adminUserPass = "admin" - foreverTokenTime = 24 * 365 * 20 * time.Hour // kind of never-expired token // TODO -- FIXME: make it -1s + // when user-provided token expiration time is zero it means the token never expires; + // we then create a token and set it to expire in 20 years - effectively, never + foreverTokenTime = 20 * 365 * 24 * time.Hour ) diff --git a/cmn/config.go b/cmn/config.go index 386afe4225..53d31c487b 100644 --- a/cmn/config.go +++ b/cmn/config.go @@ -255,12 +255,14 @@ type ( // maximum intra-cluster latencies (in the increasing order) TimeoutConf struct { - CplaneOperation cos.Duration `json:"cplane_operation"` // read-mostly via global cmn.Rom.CplaneOperation - MaxKeepalive cos.Duration `json:"max_keepalive"` // ditto, cmn.Rom.MaxKeepalive - see below - MaxHostBusy cos.Duration `json:"max_host_busy"` - Startup cos.Duration `json:"startup_time"` - JoinAtStartup cos.Duration `json:"join_startup_time"` // (join cluster at startup) timeout - SendFile cos.Duration `json:"send_file_time"` + CplaneOperation cos.Duration `json:"cplane_operation"` // read-mostly via global cmn.Rom.CplaneOperation + MaxKeepalive cos.Duration `json:"max_keepalive"` // ditto, cmn.Rom.MaxKeepalive - see below + MaxHostBusy cos.Duration `json:"max_host_busy"` // 2-phase transactions and more + Startup cos.Duration `json:"startup_time"` // primary wait for joins at (primary's) startup; indirectly, cluster startup + JoinAtStartup cos.Duration `json:"join_startup_time"` // (join cluster at startup) timeout; (2 * Startup) when zero + SendFile cos.Duration `json:"send_file_time"` // large file or blob and/or slow network + // intra-cluster EC streams; default=EcStreamsDflt; never timeout when negative + EcStreams cos.Duration `json:"ec_streams_time,omitempty"` } TimeoutConfToSet struct { CplaneOperation *cos.Duration `json:"cplane_operation,omitempty"` @@ -269,6 +271,7 @@ type ( Startup *cos.Duration `json:"startup_time,omitempty"` JoinAtStartup *cos.Duration `json:"join_startup_time,omitempty"` SendFile *cos.Duration `json:"send_file_time,omitempty"` + EcStreams *cos.Duration `json:"ec_streams_time,omitempty"` } ClientConf struct { @@ -480,10 +483,10 @@ type ( // and errors returned by remote backends; // - exceeding this limit is also an FSHC-trggering event; subsequently, // if FSHC confirms the problem it will disable the mountpath (see above) - IOErrs int `json:"io_err_limit"` + IOErrs int `json:"io_err_limit,omitempty"` // time interval (in seconds) to accumulate soft errors; // the total number by the end of the interval must not exceed `IOErrs` (above) - IOErrTime cos.Duration `json:"io_err_time"` + IOErrTime cos.Duration `json:"io_err_time,omitempty"` // whether FSHC is enabled (note: disabling FSHC is _not_ recommended) Enabled bool `json:"enabled"` @@ -507,18 +510,7 @@ type ( Enabled *bool `json:"enabled,omitempty"` } - // keepalive tracker - KeepaliveTrackerConf struct { - Name string `json:"name"` // "heartbeat" (other enumerated values TBD) - Interval cos.Duration `json:"interval"` // keepalive interval - Factor uint8 `json:"factor"` // only average - } - KeepaliveTrackerConfToSet struct { - Interval *cos.Duration `json:"interval,omitempty"` - Name *string `json:"name,omitempty" list:"readonly"` - Factor *uint8 `json:"factor,omitempty"` - } - + // keepalive KeepaliveConf struct { Proxy KeepaliveTrackerConf `json:"proxy"` // how proxy tracks target keepalives Target KeepaliveTrackerConf `json:"target"` // how target tracks primary proxies keepalives @@ -529,6 +521,16 @@ type ( Target *KeepaliveTrackerConfToSet `json:"target,omitempty"` RetryFactor *uint8 `json:"retry_factor,omitempty"` } + KeepaliveTrackerConf struct { + Name string `json:"name"` // "heartbeat" + Interval cos.Duration `json:"interval"` // keepalive interval + Factor uint8 `json:"factor"` // only average + } + KeepaliveTrackerConfToSet struct { + Interval *cos.Duration `json:"interval,omitempty"` + Name *string `json:"name,omitempty" list:"readonly"` + Factor *uint8 `json:"factor,omitempty"` + } DownloaderConf struct { Timeout cos.Duration `json:"timeout"` @@ -1650,6 +1652,12 @@ func (c *TCBConf) Validate() error { // TimeoutConf // ///////////////// +const ( + EcStreamsEver = -time.Second + EcStreamsDflt = 10 * time.Minute + EcStreamsMini = 5 * time.Minute +) + func (c *TimeoutConf) Validate() error { if c.CplaneOperation.D() < 10*time.Millisecond { return fmt.Errorf("invalid timeout.cplane_operation=%s", c.CplaneOperation) @@ -1671,6 +1679,11 @@ func (c *TimeoutConf) Validate() error { if c.SendFile.D() < time.Minute { return fmt.Errorf("invalid timeout.send_file_time=%s (cannot be less than 1m)", c.SendFile) } + // must be greater than (2 * keepalive.interval*keepalive.factor) + if c.EcStreams > 0 && c.EcStreams.D() < EcStreamsMini { + return fmt.Errorf("invalid timeout.ec_streams_time=%s (never timeout: %v; minimum: %s; default: %s)", + c.EcStreams, EcStreamsEver, EcStreamsMini, EcStreamsDflt) + } return nil } diff --git a/cmn/rom.go b/cmn/rom.go index 7a51eba63b..075c36ffd0 100644 --- a/cmn/rom.go +++ b/cmn/rom.go @@ -17,7 +17,8 @@ import ( type readMostly struct { timeout struct { cplane time.Duration // Config.Timeout.CplaneOperation - keepalive time.Duration // ditto MaxKeepalive + keepalive time.Duration // MaxKeepalive + ecstreams time.Duration // EcStreams } features feat.Flags level, modules int @@ -30,11 +31,15 @@ var Rom readMostly func (rom *readMostly) init() { rom.timeout.cplane = time.Second + time.Millisecond rom.timeout.keepalive = 2*time.Second + time.Millisecond + rom.timeout.ecstreams = EcStreamsDflt } func (rom *readMostly) Set(cfg *ClusterConfig) { rom.timeout.cplane = cfg.Timeout.CplaneOperation.D() rom.timeout.keepalive = cfg.Timeout.MaxKeepalive.D() + if d := cfg.Timeout.EcStreams; d != 0 { + rom.timeout.ecstreams = d.D() + } rom.features = cfg.Features rom.authEnabled = cfg.Auth.Enabled @@ -44,6 +49,7 @@ func (rom *readMostly) Set(cfg *ClusterConfig) { func (rom *readMostly) CplaneOperation() time.Duration { return rom.timeout.cplane } func (rom *readMostly) MaxKeepalive() time.Duration { return rom.timeout.keepalive } +func (rom *readMostly) EcStreams() time.Duration { return rom.timeout.ecstreams } func (rom *readMostly) Features() feat.Flags { return rom.features } func (rom *readMostly) TestingEnv() bool { return rom.testingEnv } func (rom *readMostly) AuthEnabled() bool { return rom.authEnabled } diff --git a/core/meta/bck.go b/core/meta/bck.go index 03bfdd9744..a7a255ed3c 100644 --- a/core/meta/bck.go +++ b/core/meta/bck.go @@ -187,10 +187,11 @@ func (b *Bck) init(bmd *BMD) error { func InitByNameOnly(bckName string, bowner Bowner) (bck *Bck, err error, ecode int) { bmd := bowner.Get() all := bmd.getAllByName(bckName) - if all == nil { + switch { + case all == nil: err = cmn.NewErrBckNotFound(&cmn.Bck{Name: bckName}) ecode = http.StatusNotFound - } else if len(all) == 1 { + case len(all) == 1: bck = &all[0] if bck.Props == nil { err = cmn.NewErrBckNotFound(bck.Bucket()) @@ -199,12 +200,11 @@ func InitByNameOnly(bckName string, bowner Bowner) (bck *Bck, err error, ecode i debug.Assert(apc.IsRemoteProvider(backend.Provider)) err = backend.init(bmd) } - } else { - err = fmt.Errorf("cannot unambiguously resolve bucket name %q to a single bucket (%v)", - bckName, all) + default: + err = fmt.Errorf("cannot unambiguously resolve bucket name %q to a single bucket (%v)", bckName, all) ecode = http.StatusUnprocessableEntity } - return + return bck, err, ecode } func (b *Bck) CksumConf() (conf *cmn.CksumConf) { return &b.Props.Cksum } diff --git a/deploy/dev/local/aisnode_config.fspaths.sh b/deploy/dev/local/aisnode_config.fspaths.sh index 07ceca4a47..92ccdcf0cb 100755 --- a/deploy/dev/local/aisnode_config.fspaths.sh +++ b/deploy/dev/local/aisnode_config.fspaths.sh @@ -37,6 +37,7 @@ cat > $AIS_CONF_FILE < $AIS_CONF_FILE <= 0, "rc: ", rc) } -func (mgr *Manager) initECBundles() error { - if !mgr.bundleEnabled.CAS(false, true) { - return nil - } - if err := transport.Handle(ReqStreamName, ECM.recvRequest); err != nil { - return fmt.Errorf("failed to register recvRequest: %v", err) - } - if err := transport.Handle(RespStreamName, ECM.recvResponse); err != nil { - return fmt.Errorf("failed to register respResponse: %v", err) +func cbReq(hdr *transport.ObjHdr, _ io.ReadCloser, _ any, err error) { + if err != nil { + nlog.Errorf("failed to request %s: %v", hdr.Cname(), err) } - cbReq := func(hdr *transport.ObjHdr, _ io.ReadCloser, _ any, err error) { - if err != nil { - nlog.Errorf("failed to request %s: %v", hdr.Cname(), err) - } +} + +func (mgr *Manager) OpenStreams() { + if !mgr.bundleEnabled.CAS(false, true) { + return } var ( client = transport.NewIntraDataClient() @@ -114,18 +117,14 @@ func (mgr *Manager) initECBundles() error { mgr.reqBundle.Store(bundle.New(client, reqSbArgs)) mgr.respBundle.Store(bundle.New(client, respSbArgs)) - - return nil } -func (mgr *Manager) closeECBundles() { +func (mgr *Manager) CloseStreams() { if !mgr.bundleEnabled.CAS(true, false) { return } mgr.req().Close(false) mgr.resp().Close(false) - transport.Unhandle(ReqStreamName) - transport.Unhandle(RespStreamName) } func (mgr *Manager) NewGetXact(bck *cmn.Bck) *XactGet { return newGetXact(bck, mgr) } @@ -319,13 +318,11 @@ func (mgr *Manager) BMDChanged() error { } mgr.bmd = newBMD - // globally + // TODO -- FIXME: remove if newBMD.IsECUsed() && !oldBMD.IsECUsed() { - if err := mgr.initECBundles(); err != nil { - return err - } + mgr.OpenStreams() } else if !newBMD.IsECUsed() && oldBMD.IsECUsed() { - mgr.closeECBundles() + mgr.CloseStreams() return nil } diff --git a/ec/metafile.go b/ec/metafile.go index 3c3414a056..4e7366525e 100644 --- a/ec/metafile.go +++ b/ec/metafile.go @@ -1,6 +1,6 @@ // Package ec provides erasure coding (EC) based data protection for AIStore. /* - * Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved. */ package ec