Skip to content

Commit

Permalink
[config change] close EC streams when idle, reopen on demand (major)
Browse files Browse the repository at this point in the history
* cluster config: add "ec_streams_time"
* proxy: `onEC` when initializing bucket
  - s3 and, separately, native API
* new sources: ais/prxec and ais/tgtec
  - add target /v1/ec endpint
* refactor; reduce copy/paste; remove unused code
* miscellaneous micro-optimizations
* part two, prev. commit: d8a71bb

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Sep 9, 2024
1 parent 350bbff commit 1114868
Show file tree
Hide file tree
Showing 28 changed files with 483 additions and 370 deletions.
7 changes: 4 additions & 3 deletions ais/htrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ func (h *htrun) _call(si *meta.Snode, bargs *bcastArgs, results *bcastResults) {
freeCargs(cargs)
}

const _callHdrLen = 5
const lenhdr = 5

func (h *htrun) call(args *callArgs, smap *smapX) (res *callResult) {
var (
Expand Down Expand Up @@ -686,7 +686,7 @@ func (h *htrun) call(args *callArgs, smap *smapX) (res *callResult) {

// req header
if args.req.Header == nil {
args.req.Header = make(http.Header, _callHdrLen)
args.req.Header = make(http.Header, lenhdr)
}
if smap.vstr != "" {
if smap.IsPrimary(h.si) {
Expand Down Expand Up @@ -1956,7 +1956,8 @@ func (h *htrun) fastKalive(smap *smapX, timeout time.Duration, ecActive bool) (p
cargs.timeout = timeout
}
if ecActive {
hdr := make(http.Header, _callHdrLen)
// (target => primary)
hdr := make(http.Header, lenhdr)
hdr.Set(apc.HdrActiveEC, "true")
cargs.req.Header = hdr
}
Expand Down
2 changes: 1 addition & 1 deletion ais/kalive.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (pkr *palive) sendKalive(smap *smapX, timeout time.Duration, now int64, fas
// check resp header from primary
// (see: _respActiveEC; compare with: _recvActiveEC)
if isActiveEC(hdr) {
pkr.p.lastEC.Store(now)
pkr.p.ec.last.Store(now)
}
}
return pid, 0, err
Expand Down
37 changes: 24 additions & 13 deletions ais/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ type (
mu sync.RWMutex
in atomic.Bool
}
lastEC atomic.Int64 // last active EC via apc.HdrActiveEC (mono time)

ec struct {
last atomic.Int64 // last active EC via apc.HdrActiveEC (mono time)
rust int64 // same as above
}
settingNewPrimary atomic.Bool // primary executing "set new primary" request (state)
readyToFastKalive atomic.Bool // primary can accept fast keepalives
}
Expand Down Expand Up @@ -224,6 +226,7 @@ func (p *proxy) Run() error {
{r: apc.Vote, h: p.voteHandler, net: accessNetIntraControl},

{r: apc.Notifs, h: p.notifs.handler, net: accessNetIntraControl},
{r: apc.EC, h: p.ecHandler, net: accessNetIntraControl},

// S3 compatibility
{r: "/" + apc.S3, h: p.s3Handler, net: accessNetPublic},
Expand Down Expand Up @@ -734,6 +737,8 @@ func (p *proxy) httpobjget(w http.ResponseWriter, r *http.Request, origURLBck ..
return
}

started := time.Now()

// 3. redirect
smap := p.owner.smap.get()
tsi, netPub, err := smap.HrwMultiHome(bck.MakeUname(objName))
Expand All @@ -742,9 +747,10 @@ func (p *proxy) httpobjget(w http.ResponseWriter, r *http.Request, origURLBck ..
return
}
if cmn.Rom.FastV(5, cos.SmoduleAIS) {
nlog.Infoln("GET " + bck.Cname(objName) + " => " + tsi.String())
nlog.Infoln("GET", bck.Cname(objName), "=>", tsi.StringEx())
}
redirectURL := p.redirectURL(r, tsi, time.Now() /*started*/, cmn.NetIntraData, netPub)

redirectURL := p.redirectURL(r, tsi, started, cmn.NetIntraData, netPub)
http.Redirect(w, r, redirectURL, http.StatusMovedPermanently)

// 4. stats
Expand Down Expand Up @@ -816,14 +822,15 @@ func (p *proxy) httpobjput(w http.ResponseWriter, r *http.Request, apireq *apiRe

// verbose
if cmn.Rom.FastV(5, cos.SmoduleAIS) {
verb, s := "PUT", ""
verb := "PUT"
if appendTyProvided {
verb = "APPEND"
}
var s string
if bck.Props.Mirror.Enabled {
s = " (put-mirror)"
}
nlog.Infof("%s %s => %s%s", verb, bck.Cname(objName), tsi.StringEx(), s)
nlog.Infoln(verb, bck.Cname(objName), "=>", tsi.StringEx(), s)
}

redirectURL := p.redirectURL(r, tsi, started, cmn.NetIntraData, netPub)
Expand Down Expand Up @@ -858,7 +865,7 @@ func (p *proxy) httpobjdelete(w http.ResponseWriter, r *http.Request) {
return
}
if cmn.Rom.FastV(5, cos.SmoduleAIS) {
nlog.Infoln("DELETE " + bck.Cname(objName) + " => " + tsi.StringEx())
nlog.Infoln("DELETE", bck.Cname(objName), "=>", tsi.StringEx())
}
redirectURL := p.redirectURL(r, tsi, time.Now() /*started*/, cmn.NetIntraControl)
http.Redirect(w, r, redirectURL, http.StatusTemporaryRedirect)
Expand Down Expand Up @@ -1457,7 +1464,11 @@ func (p *proxy) initBckTo(w http.ResponseWriter, r *http.Request, query url.Valu
bckToArgs.createAIS = true

ecode, err := bckToArgs.init()
if err != nil && ecode != http.StatusNotFound {
if err == nil {
return bckTo, 0, p.onEC(bckTo) // compare with `initAndTry`
}

if ecode != http.StatusNotFound {
p.writeErr(w, r, err, ecode)
return nil, 0, err
}
Expand Down Expand Up @@ -1697,7 +1708,7 @@ func (p *proxy) lsPage(bck *meta.Bck, amsg *apc.ActMsg, lsmsg *apc.LsoMsg, hdr h
if lsmsg.SID != "" {
s += " via " + tsi.StringEx()
}
nlog.Infof("%s[%s] %s%s", amsg.Action, lsmsg.UUID, bck.Cname(""), s)
nlog.Infoln(amsg.Action, "[", lsmsg.UUID, "]", bck.Cname(""), s)
}

config := cmn.GCO.Get()
Expand Down Expand Up @@ -2079,7 +2090,7 @@ func (p *proxy) httpobjhead(w http.ResponseWriter, r *http.Request, origURLBck .
return
}
if cmn.Rom.FastV(5, cos.SmoduleAIS) {
nlog.Infof("%s %s => %s", r.Method, bck.Cname(objName), si.StringEx())
nlog.Infoln(r.Method, bck.Cname(objName), "=>", si.StringEx())
}
redirectURL := p.redirectURL(r, si, time.Now() /*started*/, cmn.NetIntraControl)
http.Redirect(w, r, redirectURL, http.StatusTemporaryRedirect)
Expand Down Expand Up @@ -2107,7 +2118,7 @@ func (p *proxy) httpobjpatch(w http.ResponseWriter, r *http.Request) {
return
}
if cmn.Rom.FastV(5, cos.SmoduleAIS) {
nlog.Infof("%s %s => %s", r.Method, bck.Cname(objName), si.StringEx())
nlog.Infoln(r.Method, bck.Cname(objName), "=>", si.StringEx())
}
redirectURL := p.redirectURL(r, si, started, cmn.NetIntraControl)
http.Redirect(w, r, redirectURL, http.StatusTemporaryRedirect)
Expand Down Expand Up @@ -2409,7 +2420,7 @@ func (p *proxy) redirectObjAction(w http.ResponseWriter, r *http.Request, bck *m
return
}
if cmn.Rom.FastV(5, cos.SmoduleAIS) {
nlog.Infof("%q %s => %s", msg.Action, bck.Cname(objName), si.StringEx())
nlog.Infoln(msg.Action, bck.Cname(objName), "=>", si.StringEx())
}

// NOTE: Code 307 is the only way to http-redirect with the original JSON payload.
Expand Down Expand Up @@ -3135,7 +3146,7 @@ func (p *proxy) htHandler(w http.ResponseWriter, r *http.Request) {
}
baseURL := r.URL.Scheme + "://" + r.URL.Host
if cmn.Rom.FastV(5, cos.SmoduleAIS) {
nlog.Infof("[HTTP CLOUD] RevProxy handler for: %s -> %s", baseURL, r.URL.Path)
nlog.Infoln("[HTTP CLOUD] RevProxy handler:", baseURL, "-->", r.URL.Path)
}
if r.Method == http.MethodGet || r.Method == http.MethodHead {
// bck.IsHT()
Expand Down
38 changes: 17 additions & 21 deletions ais/prxbck.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,20 +118,18 @@ func (bctx *bctx) init() (ecode int, err error) {
}

if err = bctx.accessSupported(); err != nil {
ecode = http.StatusMethodNotAllowed
return
return http.StatusMethodNotAllowed, err
}
if bctx.skipBackend {
err = bck.InitNoBackend(bctx.p.owner.bmd)
} else {
err = bck.Init(bctx.p.owner.bmd)
}
if err != nil {
ecode = http.StatusBadRequest
if cmn.IsErrBucketNought(err) {
ecode = http.StatusNotFound
return http.StatusNotFound, err
}
return
return http.StatusBadRequest, err
}

bctx.isPresent = true
Expand Down Expand Up @@ -195,6 +193,7 @@ func (bctx *bctx) initAndTry() (bck *meta.Bck, err error) {
// 1. init bucket
bck = bctx.bck
if ecode, err = bctx.init(); err == nil {
err = bctx.p.onEC(bck)
return
}
if ecode != http.StatusNotFound {
Expand Down Expand Up @@ -239,7 +238,7 @@ func (bctx *bctx) try() (bck *meta.Bck, err error) {
case cmn.IsErrBucketAlreadyExists(err):
// e.g., when (re)setting backend two times in a row
// TODO: return http.StatusNoContent
nlog.Infoln(bctx.p.String()+":", err, " - nothing to do")
nlog.Infoln(bctx.p.String(), err, " - nothing to do")
return bck, nil
default:
if bctx.perms == apc.AceBckHEAD {
Expand All @@ -257,13 +256,10 @@ func (bctx *bctx) try() (bck *meta.Bck, err error) {

func (bctx *bctx) _try() (bck *meta.Bck, ecode int, err error) {
if err = bctx.bck.Validate(); err != nil {
ecode = http.StatusBadRequest
return
return bck, http.StatusBadRequest, err
}

if bctx.p.forwardCP(bctx.w, bctx.r, bctx.msg, "add-bucket", bctx.reqBody) {
err = errForwarded
return
return bck, 0, errForwarded
}

// am primary from this point on
Expand All @@ -277,8 +273,7 @@ func (bctx *bctx) _try() (bck *meta.Bck, ecode int, err error) {
}
if bck.IsAIS() {
if err = bctx.p.access(bctx.r.Header, nil /*bck*/, apc.AceCreateBucket); err != nil {
ecode = aceErrToCode(err)
return
return bck, aceErrToCode(err), err
}
nlog.Warningf("%s: %q doesn't exist, proceeding to create", bctx.p, bctx.bck)
goto creadd
Expand All @@ -287,8 +282,7 @@ func (bctx *bctx) _try() (bck *meta.Bck, ecode int, err error) {

// lookup remote
if remoteHdr, ecode, err = bctx.lookup(bck); err != nil {
bck = nil
return
return nil, ecode, err
}

// orig-url for the ht:// bucket
Expand All @@ -301,8 +295,7 @@ func (bctx *bctx) _try() (bck *meta.Bck, ecode int, err error) {
origURL = bctx.getOrigURL()
)
if origURL == "" {
err = cmn.NewErrFailedTo(bctx.p, "initialize", bctx.bck, errors.New("missing HTTP URL"))
return
return bck, 0, cmn.NewErrFailedTo(bctx.p, "initialize", bctx.bck, errors.New("missing HTTP URL"))
}
if hbo, err = cmn.NewHTTPObjPath(origURL); err != nil {
return
Expand Down Expand Up @@ -336,15 +329,18 @@ func (bctx *bctx) _try() (bck *meta.Bck, ecode int, err error) {
// add/create
creadd:
if err = bctx.p.createBucket(&apc.ActMsg{Action: action}, bck, remoteHdr); err != nil {
ecode = crerrStatus(err)
return
return bck, crerrStatus(err), err
}

// finally, initialize the newly added/created
if err = bck.Init(bctx.p.owner.bmd); err != nil {
debug.AssertNoErr(err)
ecode = http.StatusInternalServerError
err = cmn.NewErrFailedTo(bctx.p, "post create-bucket init", bck, err, ecode)
return bck, http.StatusInternalServerError,
cmn.NewErrFailedTo(bctx.p, "post create-bucket init", bck, err, ecode)
}

err = bctx.p.onEC(bck)

bck = bctx.bck
return
}
Expand Down
30 changes: 1 addition & 29 deletions ais/prxclu.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func (p *proxy) httpclupost(w http.ResponseWriter, r *http.Request) {

switch apiOp {
case apc.Keepalive:
// fast path(?)
// fast path
if len(apiItems) > 1 {
p.fastKaliveRsp(w, r, smap, config, apiItems[1] /*sid*/)
return
Expand Down Expand Up @@ -2259,31 +2259,3 @@ func mustRebalance(ctx *smapModifier, cur *smapX) bool {
}
return false
}

//
// (EC is active) and (is EC active?) via fastKalive
//

// default config.Timeout.EcStreams
const (
ecTimeout = 10 * time.Minute
)

func isActiveEC(hdr http.Header) (ok bool) {
_, ok = hdr[apc.HdrActiveEC]
return ok
}

// (target send => primary)
func (p *proxy) _recvActiveEC(rhdr http.Header, now int64) {
if isActiveEC(rhdr) {
p.lastEC.Store(now)
}
}

// (primary resp => non-primary)
func (p *proxy) _respActiveEC(whdr http.Header, now int64) {
if time.Duration(now-p.lastEC.Load()) < ecTimeout {
whdr.Set(apc.HdrActiveEC, "true")
}
}
Loading

0 comments on commit 1114868

Please sign in to comment.