From cdbb828018e244f235234464a835c308cf48c178 Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Sat, 22 Jun 2024 09:56:26 -0400 Subject: [PATCH] etl: simplify and refactor 'inline | offline' transforms * push, redirect, and reverse Signed-off-by: Alex Aizman --- ais/target.go | 2 +- ais/tgtetl.go | 4 +- core/target.go | 2 +- ext/etl/comm_internal_test.go | 2 +- ext/etl/communicator.go | 96 +++++++++++++++-------------------- ext/etl/dp.go | 2 +- 6 files changed, 46 insertions(+), 62 deletions(-) diff --git a/ais/target.go b/ais/target.go index 2738c616c9..a688353c0b 100644 --- a/ais/target.go +++ b/ais/target.go @@ -690,7 +690,7 @@ func (t *target) getObject(w http.ResponseWriter, r *http.Request, dpq *dpq, bck // two special flows if dpq.etlName != "" { - t.getETL(w, r, dpq.etlName, bck, lom.ObjName) + t.getETL(w, r, dpq.etlName, lom) return lom, nil } if cos.IsParseBool(r.Header.Get(apc.HdrBlobDownload)) { diff --git a/ais/tgtetl.go b/ais/tgtetl.go index d99f5b0fbd..50e98cedd4 100644 --- a/ais/tgtetl.go +++ b/ais/tgtetl.go @@ -152,7 +152,7 @@ func (t *target) stopETL(w http.ResponseWriter, r *http.Request, etlName string) } } -func (t *target) getETL(w http.ResponseWriter, r *http.Request, etlName string, bck *meta.Bck, objName string) { +func (t *target) getETL(w http.ResponseWriter, r *http.Request, etlName string, lom *core.LOM) { var ( comm etl.Communicator err error @@ -169,7 +169,7 @@ func (t *target) getETL(w http.ResponseWriter, r *http.Request, etlName string, t.writeErr(w, r, err) return } - if err := comm.InlineTransform(w, r, bck, objName); err != nil { + if err := comm.InlineTransform(w, r, lom); err != nil { errV := cmn.NewErrETL(&cmn.ETLErrCtx{ETLName: etlName, PodName: comm.PodName(), SvcName: comm.SvcName()}, err.Error()) xetl := comm.Xact() diff --git a/core/target.go b/core/target.go index d1a40470ca..2bac0e2b1a 100644 --- a/core/target.go +++ b/core/target.go @@ -52,7 +52,7 @@ type ( apc.PromoteArgs // all of the above } CopyParams struct { - DP DP // transform via: ext/etl/dp.go or core/ldp.go + DP DP // copy or transform via data provider, see impl-s: (ext/etl/dp.go, core/ldp.go) Xact Xact Config *cmn.Config BckTo *meta.Bck diff --git a/ext/etl/comm_internal_test.go b/ext/etl/comm_internal_test.go index 31ea0ec683..2f4cefdf11 100644 --- a/ext/etl/comm_internal_test.go +++ b/ext/etl/comm_internal_test.go @@ -91,7 +91,7 @@ var _ = Describe("CommunicatorTest", func() { Expect(err).NotTo(HaveOccurred()) })) targetServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - err := comm.InlineTransform(w, r, clusterBck, objName) + err := comm.InlineTransform(w, r, lom) Expect(err).NotTo(HaveOccurred()) })) proxyServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/ext/etl/communicator.go b/ext/etl/communicator.go index a8edd3aa3e..3969194668 100644 --- a/ext/etl/communicator.go +++ b/ext/etl/communicator.go @@ -46,13 +46,16 @@ type ( // InlineTransform uses one of the two ETL container endpoints: // - Method "PUT", Path "/" // - Method "GET", Path "/bucket/object" - InlineTransform(w http.ResponseWriter, r *http.Request, bck *meta.Bck, objName string) error + InlineTransform(w http.ResponseWriter, r *http.Request, lom *core.LOM) error + + // OfflineTransform is driven by `OfflineDP` to provide offline transformation, as it were + // Implementations include: + // - pushComm + // - redirectComm + // - revProxyComm + // See also, and separately: on-the-fly transformation as part of a user (e.g. training model) GET request handling + OfflineTransform(lom *core.LOM, timeout time.Duration) (cos.ReadCloseSizer, error) - // OfflineTransform interface implementations realize offline ETL. - // OfflineTransform is driven by `OfflineDP` - not to confuse - // with GET requests from users (such as training models and apps) - // to perform on-the-fly transformation. - OfflineTransform(bck *meta.Bck, objName string, timeout time.Duration) (cos.ReadCloseSizer, error) Stop() CommStats @@ -195,17 +198,17 @@ func (c *baseComm) getWithTimeout(url string, size int64, timeout time.Duration) // pushComm: implements (Hpush | HpushStdin) ////////////// -func (pc *pushComm) doRequest(bck *meta.Bck, lom *core.LOM, timeout time.Duration) (r cos.ReadCloseSizer, err error) { - var ecode int - if err := lom.InitBck(bck.Bucket()); err != nil { +func (pc *pushComm) doRequest(lom *core.LOM, timeout time.Duration) (r cos.ReadCloseSizer, err error) { + if err := lom.InitBck(lom.Bucket()); err != nil { return nil, err } + var ecode int lom.Lock(false) r, ecode, err = pc.do(lom, timeout) lom.Unlock(false) - if err != nil && cos.IsNotExist(err, ecode) && bck.IsRemote() { + if err != nil && cos.IsNotExist(err, ecode) && lom.Bucket().IsRemote() { _, err = core.T.GetCold(context.Background(), lom, cmn.OwtGetLock) if err != nil { return nil, err @@ -304,10 +307,8 @@ finish: return cos.NewReaderWithArgs(args), 0, nil } -func (pc *pushComm) InlineTransform(w http.ResponseWriter, _ *http.Request, bck *meta.Bck, objName string) error { - lom := core.AllocLOM(objName) - r, err := pc.doRequest(bck, lom, 0 /*timeout*/) - core.FreeLOM(lom) +func (pc *pushComm) InlineTransform(w http.ResponseWriter, _ *http.Request, lom *core.LOM) error { + r, err := pc.doRequest(lom, 0 /*timeout*/) if err != nil { return err } @@ -327,13 +328,12 @@ func (pc *pushComm) InlineTransform(w http.ResponseWriter, _ *http.Request, bck return err } -func (pc *pushComm) OfflineTransform(bck *meta.Bck, objName string, timeout time.Duration) (r cos.ReadCloseSizer, err error) { - lom := core.AllocLOM(objName) - r, err = pc.doRequest(bck, lom, timeout) +func (pc *pushComm) OfflineTransform(lom *core.LOM, timeout time.Duration) (r cos.ReadCloseSizer, err error) { + clone := *lom + r, err = pc.doRequest(&clone, timeout) if err == nil && cmn.Rom.FastV(5, cos.SmoduleETL) { - nlog.Infoln(Hpush, lom.Cname(), err) + nlog.Infoln(Hpush, clone.Cname(), err) } - core.FreeLOM(lom) return } @@ -341,34 +341,29 @@ func (pc *pushComm) OfflineTransform(bck *meta.Bck, objName string, timeout time // redirectComm: implements Hpull ////////////////// -func (rc *redirectComm) InlineTransform(w http.ResponseWriter, r *http.Request, bck *meta.Bck, objName string) error { +func (rc *redirectComm) InlineTransform(w http.ResponseWriter, r *http.Request, lom *core.LOM) error { if err := rc.boot.xctn.AbortErr(); err != nil { return err } - - lom := core.AllocLOM(objName) - size, err := lomLoad(lom, bck) + size, err := lomLoad(lom) if err != nil { - core.FreeLOM(lom) return err } if size > 0 { rc.boot.xctn.OutObjsAdd(1, size) } - http.Redirect(w, r, rc.redirectURL(lom), http.StatusTemporaryRedirect) if cmn.Rom.FastV(5, cos.SmoduleETL) { nlog.Infoln(Hpull, lom.Cname()) } - core.FreeLOM(lom) return nil } func (rc *redirectComm) redirectURL(lom *core.LOM) string { switch rc.boot.msg.ArgTypeX { case ArgTypeDefault, ArgTypeURL: - return cos.JoinPath(rc.boot.uri, transformerPath(lom.Bck(), lom.ObjName)) + return cos.JoinPath(rc.boot.uri, transformerPath(lom)) case ArgTypeFQN: return cos.JoinPath(rc.boot.uri, url.PathEscape(lom.FQN)) } @@ -376,21 +371,19 @@ func (rc *redirectComm) redirectURL(lom *core.LOM) string { return "" } -func (rc *redirectComm) OfflineTransform(bck *meta.Bck, objName string, timeout time.Duration) (cos.ReadCloseSizer, error) { - lom := core.AllocLOM(objName) - size, errV := lomLoad(lom, bck) +func (rc *redirectComm) OfflineTransform(lom *core.LOM, timeout time.Duration) (cos.ReadCloseSizer, error) { + clone := *lom + size, errV := lomLoad(&clone) if errV != nil { - core.FreeLOM(lom) return nil, errV } - etlURL := rc.redirectURL(lom) + etlURL := rc.redirectURL(&clone) r, err := rc.getWithTimeout(etlURL, size, timeout) if cmn.Rom.FastV(5, cos.SmoduleETL) { - nlog.Infoln(Hpull, lom.Cname(), err) + nlog.Infoln(Hpull, clone.Cname(), err) } - core.FreeLOM(lom) return r, err } @@ -398,18 +391,15 @@ func (rc *redirectComm) OfflineTransform(bck *meta.Bck, objName string, timeout // revProxyComm: implements Hrev ////////////////// -func (rp *revProxyComm) InlineTransform(w http.ResponseWriter, r *http.Request, bck *meta.Bck, objName string) error { - lom := core.AllocLOM(objName) - size, err := lomLoad(lom, bck) +func (rp *revProxyComm) InlineTransform(w http.ResponseWriter, r *http.Request, lom *core.LOM) error { + size, err := lomLoad(lom) if err != nil { - core.FreeLOM(lom) return err } if size > 0 { rp.boot.xctn.OutObjsAdd(1, size) } - path := transformerPath(bck, objName) - core.FreeLOM(lom) + path := transformerPath(lom) r.URL.Path, _ = url.PathUnescape(path) // `Path` must be unescaped otherwise it will be escaped again. r.URL.RawPath = path // `RawPath` should be escaped version of `Path`. @@ -418,20 +408,18 @@ func (rp *revProxyComm) InlineTransform(w http.ResponseWriter, r *http.Request, return nil } -func (rp *revProxyComm) OfflineTransform(bck *meta.Bck, objName string, timeout time.Duration) (cos.ReadCloseSizer, error) { - lom := core.AllocLOM(objName) - size, errV := lomLoad(lom, bck) +func (rp *revProxyComm) OfflineTransform(lom *core.LOM, timeout time.Duration) (cos.ReadCloseSizer, error) { + clone := *lom + size, errV := lomLoad(&clone) if errV != nil { - core.FreeLOM(lom) return nil, errV } - etlURL := cos.JoinPath(rp.boot.uri, transformerPath(bck, objName)) + etlURL := cos.JoinPath(rp.boot.uri, transformerPath(&clone)) r, err := rp.getWithTimeout(etlURL, size, timeout) if cmn.Rom.FastV(5, cos.SmoduleETL) { - nlog.Infoln(Hrev, lom.Cname(), err) + nlog.Infoln(Hrev, clone.Cname(), err) } - core.FreeLOM(lom) return r, err } @@ -467,21 +455,17 @@ func pruneQuery(rawQuery string) string { // - url.PathEscape(uname) - see below - versus // - Bck().Name + "/" + lom.ObjName - see pushComm above - versus // - bck.AddToQuery() elsewhere -func transformerPath(bck *meta.Bck, objName string) string { - uname := bck.MakeUname(objName) - return "/" + url.PathEscape(cos.UnsafeS(uname)) +func transformerPath(lom *core.LOM) string { + return "/" + url.PathEscape(lom.Uname()) } -func lomLoad(lom *core.LOM, bck *meta.Bck) (size int64, err error) { - if err = lom.InitBck(bck.Bucket()); err != nil { - return - } +func lomLoad(lom *core.LOM) (size int64, err error) { if err = lom.Load(true /*cacheIt*/, false /*locked*/); err != nil { - if cos.IsNotExist(err, 0) && bck.IsRemote() { + if cos.IsNotExist(err, 0) && lom.Bucket().IsRemote() { err = nil // NOTE: size == 0 } } else { size = lom.SizeBytes() } - return + return size, err } diff --git a/ext/etl/dp.go b/ext/etl/dp.go index 2ad0225613..ea8cef3c2f 100644 --- a/ext/etl/dp.go +++ b/ext/etl/dp.go @@ -49,7 +49,7 @@ func (dp *OfflineDP) Reader(lom *core.LOM, latestVer, sync bool) (cos.ReadOpenCl ) debug.Assert(!latestVer && !sync, "NIY") // TODO -- FIXME call := func() (int, error) { - r, err = dp.comm.OfflineTransform(lom.Bck(), lom.ObjName, dp.requestTimeout) + r, err = dp.comm.OfflineTransform(lom, dp.requestTimeout) return 0, err } // TODO: Check if ETL pod is healthy and wait some more if not (yet).