Skip to content

Commit

Permalink
core: further isolate access to LOM internals; fstat
Browse files Browse the repository at this point in the history
* prev. commit: f882ef4

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jun 23, 2024
1 parent 57b9458 commit 397e1e2
Show file tree
Hide file tree
Showing 43 changed files with 143 additions and 146 deletions.
2 changes: 1 addition & 1 deletion ais/backend/ais.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ func (m *AISbp) PutObj(r io.ReadCloser, lom *core.LOM, _ *http.Request) (ecode i
return
}
unsetUUID(&remoteBck)
size := lom.SizeBytes(true) // _special_ as it's still a workfile at this point
size := lom.Lsize(true) // _special_ as it's still a workfile at this point
args := api.PutArgs{
BaseParams: remAis.bp,
Bck: remoteBck,
Expand Down
39 changes: 18 additions & 21 deletions ais/backend/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,28 +194,25 @@ func (s3bp *s3bp) GetBucketInv(bck *meta.Bck, ctx *core.LsoInvCtx) (int, error)

// acquired wlock: check for write/write race

finfo, err := os.Stat(ctx.Lom.FQN)
if err == nil {
newMtime := finfo.ModTime()
if newMtime.Sub(mtime) > time.Hour {
// updated by smbd else
// reload the lom and return
ctx.Lom.Uncache()
_, usable = checkInvLom(newMtime, ctx)
debug.Assert(usable)

// wlock --> rlock must succeed
lom.Unlock(true)
lom.Lock(false)

if ctx.Lmfh, err = ctx.Lom.Open(); err != nil {
lom.Unlock(false)
core.FreeLOM(lom)
ctx.Lom = nil
return 0, _errInv("reload-inv-open", err)
}
return 0, nil // ok
_, _, newMtime, err := ctx.Lom.Fstat(false /*get-atime*/)
if err == nil && newMtime.Sub(mtime) > time.Hour {
// updated by smbd else
// reload the lom and return
ctx.Lom.Uncache()
_, usable = checkInvLom(newMtime, ctx)
debug.Assert(usable)

// wlock --> rlock must succeed
lom.Unlock(true)
lom.Lock(false)

if ctx.Lmfh, err = ctx.Lom.Open(); err != nil {
lom.Unlock(false)
core.FreeLOM(lom)
ctx.Lom = nil
return 0, _errInv("reload-inv-open", err)
}
return 0, nil // ok
}

// still under wlock: cleanup old, read and write as ctx.Lom
Expand Down
12 changes: 5 additions & 7 deletions ais/backend/awsinv.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,27 +165,25 @@ func cleanupOldInventory(cloudBck *cmn.Bck, svc *s3.Client, lsV2resp *s3.ListObj
}

func checkInvLom(latest time.Time, ctx *core.LsoInvCtx) (time.Time, bool) {
finfo, err := os.Stat(ctx.Lom.FQN)
size, _, mtime, err := ctx.Lom.Fstat(false /*get-atime*/)
if err != nil {
debug.Assert(os.IsNotExist(err), err)
nlog.Infoln(invTag, "does not exist, getting a new one for the timestamp:", latest)
return time.Time{}, false
}

if cmn.Rom.FastV(5, cos.SmoduleBackend) {
nlog.Infoln(core.T.String(), "checking", ctx.Lom.String(), ctx.Lom.FQN, ctx.Lom.HrwFQN)
}
mtime := finfo.ModTime()
abs := _sinceAbs(mtime, latest)
if abs < time.Second {
debug.Assert(ctx.Size == 0 || ctx.Size == finfo.Size())
ctx.Size = finfo.Size()
debug.Assert(ctx.Size == 0 || ctx.Size == size)
ctx.Size = size

// start (or rather, keep) using this one
errN := ctx.Lom.Load(true, true)
debug.AssertNoErr(errN)
debug.Assert(ctx.Lom.SizeBytes() == finfo.Size(), ctx.Lom.SizeBytes(), finfo.Size())
// TODO -- FIXME: revisit
// debug.Assert(_sinceAbs(mtime, ctx.Lom.Atime()) < time.Second, mtime.String(), ctx.Lom.Atime().String())
debug.Assert(ctx.Lom.Lsize() == size, ctx.Lom.Lsize(), size)
return time.Time{}, true
}

Expand Down
2 changes: 1 addition & 1 deletion ais/backend/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ func (azbp *azbp) PutObj(r io.ReadCloser, lom *core.LOM, _ *http.Request) (int,
cloudBck := lom.Bck().RemoteBck()

opts := azblob.UploadStreamOptions{}
if size := lom.SizeBytes(true); size > cos.MiB {
if size := lom.Lsize(true); size > cos.MiB {
opts.Concurrency = int(min((size+cos.MiB-1)/cos.MiB, 8))
}

Expand Down
2 changes: 1 addition & 1 deletion ais/s3/presigned.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (pts *PresignedReq) DoReader(client *http.Client) (*PresignedResp, error) {
nreq.ContentLength = pts.oreq.ContentLength
if nreq.ContentLength == -1 {
debug.Assert(false) // FIXME: remove, or catch in debug mode
nreq.ContentLength = pts.lom.SizeBytes()
nreq.ContentLength = pts.lom.Lsize()
}
}

Expand Down
2 changes: 1 addition & 1 deletion ais/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -1373,7 +1373,7 @@ func (t *target) delobj(lom *core.LOM, evict bool) (int, error, bool) {
backendErrCode, backendErr = t.Backend(lom.Bck()).DeleteObj(lom)
}
if delFromAIS {
size := lom.SizeBytes()
size := lom.Lsize()
aisErr = lom.RemoveObj()
if aisErr != nil {
if !os.IsNotExist(aisErr) {
Expand Down
6 changes: 3 additions & 3 deletions ais/tgtfcold.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,13 @@ func (goi *getOI) coldReopen(res *core.GetReaderResult) error {
}
var (
hrng *htrange
size = lom.SizeBytes()
size = lom.Lsize()
reader io.Reader = lmfh
whdr = goi.w.Header()
)
if goi.ranges.Range != "" {
// (not here if range checksum enabled)
rsize := goi.lom.SizeBytes()
rsize := goi.lom.Lsize()
if goi.ranges.Size > 0 {
rsize = goi.ranges.Size
}
Expand Down Expand Up @@ -164,7 +164,7 @@ func (goi *getOI) _fini(revert string, fullSize, txSize int64) error {
nlog.InfoDepth(1, ftcg+"(load)", lom, err) // (unlikely)
return errSendingResp
}
debug.Assert(lom.SizeBytes() == fullSize)
debug.Assert(lom.Lsize() == fullSize)
goi.lom.Unlock(true)

// regular get stats
Expand Down
4 changes: 2 additions & 2 deletions ais/tgtimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (t *target) PutObject(lom *core.LOM, params *core.PutParams) error {
}
_, err := poi.putObject()
freePOI(poi)
debug.Assert(err != nil || params.Size <= 0 || params.Size == lom.SizeBytes(true), lom.String(), params.Size, lom.SizeBytes(true))
debug.Assert(err != nil || params.Size <= 0 || params.Size == lom.Lsize(true), lom.String(), params.Size, lom.Lsize(true))
return err
}

Expand Down Expand Up @@ -196,7 +196,7 @@ func (t *target) GetCold(ctx context.Context, lom *core.LOM, owt cmn.OWT) (ecode
// 4. stats
t.statsT.AddMany(
cos.NamedVal64{Name: stats.GetColdCount, Value: 1},
cos.NamedVal64{Name: stats.GetColdSize, Value: lom.SizeBytes()},
cos.NamedVal64{Name: stats.GetColdSize, Value: lom.Lsize()},
cos.NamedVal64{Name: stats.GetColdRwLatency, Value: mono.SinceNano(now)},
)
return 0, nil
Expand Down
28 changes: 14 additions & 14 deletions ais/tgtobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (poi *putOI) putObject() (ecode int, err error) {
// same-checksum-skip-writing, on the other
if poi.owt == cmn.OwtPut && poi.restful {
debug.Assert(cos.IsValidAtime(poi.atime), poi.atime)
size := poi.lom.SizeBytes()
size := poi.lom.Lsize()
poi.t.statsT.AddMany(
cos.NamedVal64{Name: stats.PutCount, Value: 1},
cos.NamedVal64{Name: stats.PutSize, Value: size},
Expand All @@ -208,7 +208,7 @@ func (poi *putOI) putObject() (ecode int, err error) {
}
} else if poi.xctn != nil && poi.owt == cmn.OwtPromote {
// xaction in-objs counters, promote first
poi.xctn.InObjsAdd(1, poi.lom.SizeBytes())
poi.xctn.InObjsAdd(1, poi.lom.Lsize())
}
if cmn.Rom.FastV(5, cos.SmoduleAIS) {
nlog.Infoln(poi.loghdr())
Expand Down Expand Up @@ -450,7 +450,7 @@ func (poi *putOI) write() (buf []byte, slab *memsys.Slab, lmfh cos.LomWriter, er
cos.Close(lmfh)
lmfh = nil

poi.lom.SetSize(written) // TODO: compare with non-zero lom.SizeBytes() that may have been set via oa.FromHeader()
poi.lom.SetSize(written) // TODO: compare with non-zero lom.Lsize() that may have been set via oa.FromHeader()
if cksums.store != nil {
if !cksums.finalized {
cksums.store.Finalize()
Expand Down Expand Up @@ -988,7 +988,7 @@ func (goi *getOI) txfini() (ecode int, err error) {
switch {
case goi.ranges.Range != "":
debug.Assert(!dpq.isArch())
rsize := goi.lom.SizeBytes()
rsize := goi.lom.Lsize()
if goi.ranges.Size > 0 {
rsize = goi.ranges.Size
}
Expand Down Expand Up @@ -1050,7 +1050,7 @@ func (goi *getOI) _txreg(fqn string, lmfh *os.File, whdr http.Header) (err error
dpq = goi.dpq
lom = goi.lom
cksum = lom.Checksum()
size = lom.SizeBytes()
size = lom.Lsize()
)
// set response header
whdr.Set(cos.HdrContentType, cos.ContentBinary)
Expand All @@ -1077,7 +1077,7 @@ func (goi *getOI) _txarch(fqn string, lmfh *os.File, whdr http.Header) error {
if err != nil {
return err
}
ar, err = archive.NewReader(mime, lmfh, lom.SizeBytes())
ar, err = archive.NewReader(mime, lmfh, lom.Lsize())
if err != nil {
return fmt.Errorf("failed to open %s: %w", lom.Cname(), err)
}
Expand Down Expand Up @@ -1161,7 +1161,7 @@ func (goi *getOI) stats(written int64) {
if goi.verchanged {
goi.t.statsT.AddMany(
cos.NamedVal64{Name: stats.VerChangeCount, Value: 1},
cos.NamedVal64{Name: stats.VerChangeSize, Value: goi.lom.SizeBytes()},
cos.NamedVal64{Name: stats.VerChangeSize, Value: goi.lom.Lsize()},
)
}
}
Expand Down Expand Up @@ -1393,7 +1393,7 @@ func (coi *copyOI) _dryRun(lom *core.LOM, objnameTo string) (size int64, err err
if coi.DP == nil {
uname := coi.BckTo.MakeUname(objnameTo)
if lom.Uname() != cos.UnsafeS(uname) {
size = lom.SizeBytes()
size = lom.Lsize()
}
return size, nil
}
Expand Down Expand Up @@ -1448,7 +1448,7 @@ func (coi *copyOI) _reader(t *target, dm *bundle.DataMover, lom, dst *core.LOM)
freePOI(poi)
if err == nil {
// xaction stats: inc locally processed (and see data mover for in and out objs)
size = oah.SizeBytes()
size = oah.Lsize()
}
return size, ecode, err
}
Expand Down Expand Up @@ -1482,7 +1482,7 @@ func (coi *copyOI) _regular(t *target, lom, dst *core.LOM) (size int64, _ error)
}
dst2, err := lom.Copy2FQN(dst.FQN, coi.Buf)
if err == nil {
size = lom.SizeBytes()
size = lom.Lsize()
if coi.Finalize {
t.putMirror(dst2)
}
Expand Down Expand Up @@ -1552,7 +1552,7 @@ func (coi *copyOI) _send(t *target, lom *core.LOM, sargs *sendArgs) (size int64,
if err != nil {
return 0, err
}
size = lom.SizeBytes()
size = lom.Lsize()
sargs.reader, sargs.objAttrs = reader, lom
default:
// 3. DP transform (possibly, no-op)
Expand All @@ -1562,7 +1562,7 @@ func (coi *copyOI) _send(t *target, lom *core.LOM, sargs *sendArgs) (size int64,
return
}
// returns cos.ContentLengthUnknown (-1) if post-transform size is unknown
size = oah.SizeBytes()
size = oah.Lsize()
sargs.reader, sargs.objAttrs = reader, oah
}

Expand Down Expand Up @@ -1602,7 +1602,7 @@ func (coi *copyOI) put(t *target, sargs *sendArgs) error {
hdr = make(http.Header, 8)
query = sargs.bckTo.NewQuery()
)
cmn.ToHeader(sargs.objAttrs, hdr, sargs.objAttrs.SizeBytes(true))
cmn.ToHeader(sargs.objAttrs, hdr, sargs.objAttrs.Lsize(true))
hdr.Set(apc.HdrT2TPutterID, t.SID())
query.Set(apc.QparamOWT, sargs.owt.ToS())
if coi.Xact != nil {
Expand Down Expand Up @@ -1713,7 +1713,7 @@ cpap: // copy + append
}
cksum.Init(a.lom.CksumType())
aw = archive.NewWriter(a.mime, wfh, &cksum, nil)
err = aw.Copy(lmfh, a.lom.SizeBytes())
err = aw.Copy(lmfh, a.lom.Lsize())
if err == nil {
err = aw.Write(a.filename, oah, a.r)
}
Expand Down
4 changes: 2 additions & 2 deletions cmn/archive/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (tw *tarWriter) Write(fullname string, oah cos.OAH, reader io.Reader) (err
hdr := tar.Header{
Typeflag: tar.TypeReg,
Name: fullname,
Size: oah.SizeBytes(),
Size: oah.Lsize(),
ModTime: time.Unix(0, oah.AtimeUnix()),
Mode: int64(cos.PermRWRR),
Format: tw.format,
Expand Down Expand Up @@ -215,7 +215,7 @@ func (zw *zipWriter) Write(fullname string, oah cos.OAH, reader io.Reader) error
ziphdr := zip.FileHeader{
Name: fullname,
Comment: fullname,
UncompressedSize64: uint64(oah.SizeBytes()),
UncompressedSize64: uint64(oah.Lsize()),
Modified: time.Unix(0, oah.AtimeUnix()),
}
zw.cb(&ziphdr)
Expand Down
6 changes: 3 additions & 3 deletions cmn/cos/oah.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ package cos

type (
OAH interface {
SizeBytes(special ...bool) int64
Lsize(special ...bool) int64
Version(special ...bool) string
VersionPtr() *string
Checksum() *Cksum
Expand All @@ -28,8 +28,8 @@ type (
// interface guard
var _ OAH = (*SimpleOAH)(nil)

func (s SimpleOAH) SizeBytes(...bool) int64 { return s.Size }
func (s SimpleOAH) AtimeUnix() int64 { return s.Atime }
func (s SimpleOAH) Lsize(...bool) int64 { return s.Size }
func (s SimpleOAH) AtimeUnix() int64 { return s.Atime }

func (SimpleOAH) Version(...bool) string { return "" }
func (SimpleOAH) VersionPtr() *string { return nil }
Expand Down
12 changes: 6 additions & 6 deletions cmn/objattrs.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ func (oa *ObjAttrs) String() string {
return fmt.Sprintf("%dB, v%q, %s, %+v", oa.Size, oa.Version(), oa.Cksum, oa.CustomMD)
}

func (oa *ObjAttrs) SizeBytes(_ ...bool) int64 { return oa.Size }
func (oa *ObjAttrs) AtimeUnix() int64 { return oa.Atime }
func (oa *ObjAttrs) Checksum() *cos.Cksum { return oa.Cksum }
func (oa *ObjAttrs) SetCksum(ty, val string) { oa.Cksum = cos.NewCksum(ty, val) }
func (oa *ObjAttrs) Lsize(_ ...bool) int64 { return oa.Size }
func (oa *ObjAttrs) AtimeUnix() int64 { return oa.Atime }
func (oa *ObjAttrs) Checksum() *cos.Cksum { return oa.Cksum }
func (oa *ObjAttrs) SetCksum(ty, val string) { oa.Cksum = cos.NewCksum(ty, val) }

func (oa *ObjAttrs) Version(_ ...bool) string {
if oa.Ver == nil {
Expand Down Expand Up @@ -160,7 +160,7 @@ func (oa *ObjAttrs) DelCustomKeys(keys ...string) {
// clone OAH => ObjAttrs (see also lom.CopyAttrs)
func (oa *ObjAttrs) CopyFrom(oah cos.OAH, skipCksum bool) {
oa.Atime = oah.AtimeUnix()
oa.Size = oah.SizeBytes()
oa.Size = oah.Lsize()
oa.CopyVersion(oah)
if !skipCksum {
oa.Cksum = oah.Checksum().Clone()
Expand Down Expand Up @@ -267,7 +267,7 @@ func (oa *ObjAttrs) Equal(rem cos.OAH) (eq bool) {
sameEtag bool
)
// size check
if remSize := rem.SizeBytes(true); oa.Size != 0 && remSize != 0 && oa.Size != remSize {
if remSize := rem.Lsize(true); oa.Size != 0 && remSize != 0 && oa.Size != remSize {
return false
}

Expand Down
6 changes: 4 additions & 2 deletions core/ct.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/core/meta"
"github.com/NVIDIA/aistore/fs"
)
Expand Down Expand Up @@ -40,11 +41,12 @@ func (ct *CT) ContentType() string { return ct.contentType }
func (ct *CT) Bck() *meta.Bck { return ct.bck }
func (ct *CT) Bucket() *cmn.Bck { return (*cmn.Bck)(ct.bck) }
func (ct *CT) Mountpath() *fs.Mountpath { return ct.mi }
func (ct *CT) SizeBytes() int64 { return ct.size }
func (ct *CT) Lsize() int64 { return ct.size }
func (ct *CT) MtimeUnix() int64 { return ct.mtime }
func (ct *CT) Digest() uint64 { return ct.digest }

func (ct *CT) LoadFromFS() error {
func (ct *CT) LoadSliceFromFS() error {
debug.Assert(ct.ContentType() == fs.ECSliceType, "unexpected content type: ", ct.ContentType())
st, err := os.Stat(ct.FQN())
if err != nil {
return err
Expand Down
Loading

0 comments on commit 397e1e2

Please sign in to comment.