Skip to content

Commit

Permalink
etl: simplify and refactor 'inline | offline' transforms
Browse files Browse the repository at this point in the history
* push, redirect, and reverse

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jun 22, 2024
1 parent 3816bfb commit cdbb828
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 62 deletions.
2 changes: 1 addition & 1 deletion ais/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ func (t *target) getObject(w http.ResponseWriter, r *http.Request, dpq *dpq, bck

// two special flows
if dpq.etlName != "" {
t.getETL(w, r, dpq.etlName, bck, lom.ObjName)
t.getETL(w, r, dpq.etlName, lom)
return lom, nil
}
if cos.IsParseBool(r.Header.Get(apc.HdrBlobDownload)) {
Expand Down
4 changes: 2 additions & 2 deletions ais/tgtetl.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (t *target) stopETL(w http.ResponseWriter, r *http.Request, etlName string)
}
}

func (t *target) getETL(w http.ResponseWriter, r *http.Request, etlName string, bck *meta.Bck, objName string) {
func (t *target) getETL(w http.ResponseWriter, r *http.Request, etlName string, lom *core.LOM) {
var (
comm etl.Communicator
err error
Expand All @@ -169,7 +169,7 @@ func (t *target) getETL(w http.ResponseWriter, r *http.Request, etlName string,
t.writeErr(w, r, err)
return
}
if err := comm.InlineTransform(w, r, bck, objName); err != nil {
if err := comm.InlineTransform(w, r, lom); err != nil {
errV := cmn.NewErrETL(&cmn.ETLErrCtx{ETLName: etlName, PodName: comm.PodName(), SvcName: comm.SvcName()},
err.Error())
xetl := comm.Xact()
Expand Down
2 changes: 1 addition & 1 deletion core/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type (
apc.PromoteArgs // all of the above
}
CopyParams struct {
DP DP // transform via: ext/etl/dp.go or core/ldp.go
DP DP // copy or transform via data provider, see impl-s: (ext/etl/dp.go, core/ldp.go)
Xact Xact
Config *cmn.Config
BckTo *meta.Bck
Expand Down
2 changes: 1 addition & 1 deletion ext/etl/comm_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ var _ = Describe("CommunicatorTest", func() {
Expect(err).NotTo(HaveOccurred())
}))
targetServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
err := comm.InlineTransform(w, r, clusterBck, objName)
err := comm.InlineTransform(w, r, lom)
Expect(err).NotTo(HaveOccurred())
}))
proxyServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down
96 changes: 40 additions & 56 deletions ext/etl/communicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,16 @@ type (
// InlineTransform uses one of the two ETL container endpoints:
// - Method "PUT", Path "/"
// - Method "GET", Path "/bucket/object"
InlineTransform(w http.ResponseWriter, r *http.Request, bck *meta.Bck, objName string) error
InlineTransform(w http.ResponseWriter, r *http.Request, lom *core.LOM) error

// OfflineTransform is driven by `OfflineDP` to provide offline transformation, as it were
// Implementations include:
// - pushComm
// - redirectComm
// - revProxyComm
// See also, and separately: on-the-fly transformation as part of a user (e.g. training model) GET request handling
OfflineTransform(lom *core.LOM, timeout time.Duration) (cos.ReadCloseSizer, error)

// OfflineTransform interface implementations realize offline ETL.
// OfflineTransform is driven by `OfflineDP` - not to confuse
// with GET requests from users (such as training models and apps)
// to perform on-the-fly transformation.
OfflineTransform(bck *meta.Bck, objName string, timeout time.Duration) (cos.ReadCloseSizer, error)
Stop()

CommStats
Expand Down Expand Up @@ -195,17 +198,17 @@ func (c *baseComm) getWithTimeout(url string, size int64, timeout time.Duration)
// pushComm: implements (Hpush | HpushStdin)
//////////////

func (pc *pushComm) doRequest(bck *meta.Bck, lom *core.LOM, timeout time.Duration) (r cos.ReadCloseSizer, err error) {
var ecode int
if err := lom.InitBck(bck.Bucket()); err != nil {
func (pc *pushComm) doRequest(lom *core.LOM, timeout time.Duration) (r cos.ReadCloseSizer, err error) {
if err := lom.InitBck(lom.Bucket()); err != nil {
return nil, err
}

var ecode int
lom.Lock(false)
r, ecode, err = pc.do(lom, timeout)
lom.Unlock(false)

if err != nil && cos.IsNotExist(err, ecode) && bck.IsRemote() {
if err != nil && cos.IsNotExist(err, ecode) && lom.Bucket().IsRemote() {
_, err = core.T.GetCold(context.Background(), lom, cmn.OwtGetLock)
if err != nil {
return nil, err
Expand Down Expand Up @@ -304,10 +307,8 @@ finish:
return cos.NewReaderWithArgs(args), 0, nil
}

func (pc *pushComm) InlineTransform(w http.ResponseWriter, _ *http.Request, bck *meta.Bck, objName string) error {
lom := core.AllocLOM(objName)
r, err := pc.doRequest(bck, lom, 0 /*timeout*/)
core.FreeLOM(lom)
func (pc *pushComm) InlineTransform(w http.ResponseWriter, _ *http.Request, lom *core.LOM) error {
r, err := pc.doRequest(lom, 0 /*timeout*/)
if err != nil {
return err
}
Expand All @@ -327,89 +328,78 @@ func (pc *pushComm) InlineTransform(w http.ResponseWriter, _ *http.Request, bck
return err
}

func (pc *pushComm) OfflineTransform(bck *meta.Bck, objName string, timeout time.Duration) (r cos.ReadCloseSizer, err error) {
lom := core.AllocLOM(objName)
r, err = pc.doRequest(bck, lom, timeout)
func (pc *pushComm) OfflineTransform(lom *core.LOM, timeout time.Duration) (r cos.ReadCloseSizer, err error) {
clone := *lom
r, err = pc.doRequest(&clone, timeout)
if err == nil && cmn.Rom.FastV(5, cos.SmoduleETL) {
nlog.Infoln(Hpush, lom.Cname(), err)
nlog.Infoln(Hpush, clone.Cname(), err)
}
core.FreeLOM(lom)
return
}

//////////////////
// redirectComm: implements Hpull
//////////////////

func (rc *redirectComm) InlineTransform(w http.ResponseWriter, r *http.Request, bck *meta.Bck, objName string) error {
func (rc *redirectComm) InlineTransform(w http.ResponseWriter, r *http.Request, lom *core.LOM) error {
if err := rc.boot.xctn.AbortErr(); err != nil {
return err
}

lom := core.AllocLOM(objName)
size, err := lomLoad(lom, bck)
size, err := lomLoad(lom)
if err != nil {
core.FreeLOM(lom)
return err
}
if size > 0 {
rc.boot.xctn.OutObjsAdd(1, size)
}

http.Redirect(w, r, rc.redirectURL(lom), http.StatusTemporaryRedirect)

if cmn.Rom.FastV(5, cos.SmoduleETL) {
nlog.Infoln(Hpull, lom.Cname())
}
core.FreeLOM(lom)
return nil
}

func (rc *redirectComm) redirectURL(lom *core.LOM) string {
switch rc.boot.msg.ArgTypeX {
case ArgTypeDefault, ArgTypeURL:
return cos.JoinPath(rc.boot.uri, transformerPath(lom.Bck(), lom.ObjName))
return cos.JoinPath(rc.boot.uri, transformerPath(lom))
case ArgTypeFQN:
return cos.JoinPath(rc.boot.uri, url.PathEscape(lom.FQN))
}
cos.Assert(false) // is validated at construction time
return ""
}

func (rc *redirectComm) OfflineTransform(bck *meta.Bck, objName string, timeout time.Duration) (cos.ReadCloseSizer, error) {
lom := core.AllocLOM(objName)
size, errV := lomLoad(lom, bck)
func (rc *redirectComm) OfflineTransform(lom *core.LOM, timeout time.Duration) (cos.ReadCloseSizer, error) {
clone := *lom
size, errV := lomLoad(&clone)
if errV != nil {
core.FreeLOM(lom)
return nil, errV
}

etlURL := rc.redirectURL(lom)
etlURL := rc.redirectURL(&clone)
r, err := rc.getWithTimeout(etlURL, size, timeout)

if cmn.Rom.FastV(5, cos.SmoduleETL) {
nlog.Infoln(Hpull, lom.Cname(), err)
nlog.Infoln(Hpull, clone.Cname(), err)
}
core.FreeLOM(lom)
return r, err
}

//////////////////
// revProxyComm: implements Hrev
//////////////////

func (rp *revProxyComm) InlineTransform(w http.ResponseWriter, r *http.Request, bck *meta.Bck, objName string) error {
lom := core.AllocLOM(objName)
size, err := lomLoad(lom, bck)
func (rp *revProxyComm) InlineTransform(w http.ResponseWriter, r *http.Request, lom *core.LOM) error {
size, err := lomLoad(lom)
if err != nil {
core.FreeLOM(lom)
return err
}
if size > 0 {
rp.boot.xctn.OutObjsAdd(1, size)
}
path := transformerPath(bck, objName)
core.FreeLOM(lom)
path := transformerPath(lom)

r.URL.Path, _ = url.PathUnescape(path) // `Path` must be unescaped otherwise it will be escaped again.
r.URL.RawPath = path // `RawPath` should be escaped version of `Path`.
Expand All @@ -418,20 +408,18 @@ func (rp *revProxyComm) InlineTransform(w http.ResponseWriter, r *http.Request,
return nil
}

func (rp *revProxyComm) OfflineTransform(bck *meta.Bck, objName string, timeout time.Duration) (cos.ReadCloseSizer, error) {
lom := core.AllocLOM(objName)
size, errV := lomLoad(lom, bck)
func (rp *revProxyComm) OfflineTransform(lom *core.LOM, timeout time.Duration) (cos.ReadCloseSizer, error) {
clone := *lom
size, errV := lomLoad(&clone)
if errV != nil {
core.FreeLOM(lom)
return nil, errV
}
etlURL := cos.JoinPath(rp.boot.uri, transformerPath(bck, objName))
etlURL := cos.JoinPath(rp.boot.uri, transformerPath(&clone))
r, err := rp.getWithTimeout(etlURL, size, timeout)

if cmn.Rom.FastV(5, cos.SmoduleETL) {
nlog.Infoln(Hrev, lom.Cname(), err)
nlog.Infoln(Hrev, clone.Cname(), err)
}
core.FreeLOM(lom)
return r, err
}

Expand Down Expand Up @@ -467,21 +455,17 @@ func pruneQuery(rawQuery string) string {
// - url.PathEscape(uname) - see below - versus
// - Bck().Name + "/" + lom.ObjName - see pushComm above - versus
// - bck.AddToQuery() elsewhere
func transformerPath(bck *meta.Bck, objName string) string {
uname := bck.MakeUname(objName)
return "/" + url.PathEscape(cos.UnsafeS(uname))
func transformerPath(lom *core.LOM) string {
return "/" + url.PathEscape(lom.Uname())
}

func lomLoad(lom *core.LOM, bck *meta.Bck) (size int64, err error) {
if err = lom.InitBck(bck.Bucket()); err != nil {
return
}
func lomLoad(lom *core.LOM) (size int64, err error) {
if err = lom.Load(true /*cacheIt*/, false /*locked*/); err != nil {
if cos.IsNotExist(err, 0) && bck.IsRemote() {
if cos.IsNotExist(err, 0) && lom.Bucket().IsRemote() {
err = nil // NOTE: size == 0
}
} else {
size = lom.SizeBytes()
}
return
return size, err
}
2 changes: 1 addition & 1 deletion ext/etl/dp.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (dp *OfflineDP) Reader(lom *core.LOM, latestVer, sync bool) (cos.ReadOpenCl
)
debug.Assert(!latestVer && !sync, "NIY") // TODO -- FIXME
call := func() (int, error) {
r, err = dp.comm.OfflineTransform(lom.Bck(), lom.ObjName, dp.requestTimeout)
r, err = dp.comm.OfflineTransform(lom, dp.requestTimeout)
return 0, err
}
// TODO: Check if ETL pod is healthy and wait some more if not (yet).
Expand Down

0 comments on commit cdbb828

Please sign in to comment.