Skip to content

Commit

Permalink
core: further isolate access to LOM internals
Browse files Browse the repository at this point in the history
* lom.FQN

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jun 21, 2024
1 parent cf211ab commit f882ef4
Show file tree
Hide file tree
Showing 19 changed files with 130 additions and 100 deletions.
6 changes: 3 additions & 3 deletions ais/backend/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (s3bp *s3bp) GetBucketInv(bck *meta.Bck, ctx *core.LsoInvCtx) (int, error)
ctx.Lom = lom
mtime, usable := checkInvLom(csv.mtime, ctx)
if usable {
if ctx.Lmfh, err = ctx.Lom.OpenFile(); err != nil {
if ctx.Lmfh, err = ctx.Lom.Open(); err != nil {
lom.Unlock(false)
core.FreeLOM(lom)
ctx.Lom = nil
Expand Down Expand Up @@ -208,7 +208,7 @@ func (s3bp *s3bp) GetBucketInv(bck *meta.Bck, ctx *core.LsoInvCtx) (int, error)
lom.Unlock(true)
lom.Lock(false)

if ctx.Lmfh, err = ctx.Lom.OpenFile(); err != nil {
if ctx.Lmfh, err = ctx.Lom.Open(); err != nil {
lom.Unlock(false)
core.FreeLOM(lom)
ctx.Lom = nil
Expand All @@ -235,7 +235,7 @@ func (s3bp *s3bp) GetBucketInv(bck *meta.Bck, ctx *core.LsoInvCtx) (int, error)
}

lom.Lock(false) // must succeed
if ctx.Lmfh, err = ctx.Lom.OpenFile(); err != nil {
if ctx.Lmfh, err = ctx.Lom.Open(); err != nil {
lom.Unlock(false)
core.FreeLOM(lom)
ctx.Lom = nil
Expand Down
4 changes: 2 additions & 2 deletions ais/backend/awsinv.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (s3bp *s3bp) getInventory(cloudBck *cmn.Bck, ctx *core.LsoInvCtx, csv invT)
lom.SetSize(csv.size)

wfqn := fs.CSM.Gen(ctx.Lom, fs.WorkfileType, "")
wfh, err := ctx.Lom.CreateFile(wfqn)
wfh, err := ctx.Lom.CreateWork(wfqn)
if err != nil {
return _errInv("create-file", err)
}
Expand Down Expand Up @@ -248,7 +248,7 @@ func (s3bp *s3bp) getInventory(cloudBck *cmn.Bck, ctx *core.LsoInvCtx, csv invT)
// finalize (NOTE a lighter version of FinalizeObj - no redundancy, no locks)
if err == nil {
lom := ctx.Lom
if err = lom.RenameFrom(wfqn); err == nil {
if err = lom.RenameFinalize(wfqn); err == nil {
if err = os.Chtimes(lom.FQN, csv.mtime, csv.mtime); err == nil {
nlog.Infoln("new", invTag+":", lom.Cname(), ctx.Schema)

Expand Down
6 changes: 3 additions & 3 deletions ais/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -1374,7 +1374,7 @@ func (t *target) delobj(lom *core.LOM, evict bool) (int, error, bool) {
}
if delFromAIS {
size := lom.SizeBytes()
aisErr = lom.Remove()
aisErr = lom.RemoveObj()
if aisErr != nil {
if !os.IsNotExist(aisErr) {
if backendErr != nil {
Expand Down Expand Up @@ -1430,7 +1430,7 @@ func (t *target) objMv(lom *core.LOM, msg *apc.ActMsg) (err error) {

// TODO: combine copy+delete under a single write lock
lom.Lock(true)
if err := lom.Remove(); err != nil {
if err := lom.RemoveObj(); err != nil {
nlog.Warningf("%s: failed to delete renamed object %s (new name %s): %v", t, lom, msg.Name, err)
}
lom.Unlock(true)
Expand Down Expand Up @@ -1487,7 +1487,7 @@ func _blobdl(params *core.BlobParams, oa *cmn.ObjAttrs) (string, *xs.XactBlobDl,
if params.WriteSGL == nil {
// regular lom save (custom writer not present)
wfqn := fs.CSM.Gen(params.Lom, fs.WorkfileType, "blob-dl")
lmfh, err := params.Lom.CreateFile(wfqn)
lmfh, err := params.Lom.CreateWork(wfqn)
if err != nil {
return "", nil, err
}
Expand Down
14 changes: 7 additions & 7 deletions ais/tgtfcold.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ func (goi *getOI) coldReopen(res *core.GetReaderResult) error {
)
if goi.verchanged {
revert = fs.CSM.Gen(lom, fs.WorkfileType, fs.WorkfileColdget)
if err := os.Rename(lom.FQN, revert); err != nil {
if err := lom.RenameMainTo(revert); err != nil {
nlog.Errorln("failed to rename prev. version - proceeding anyway", lom.FQN, "=>", revert)
revert = ""
}
}
lmfh, err := lom.CreateFile(lom.FQN)
lmfh, err := lom.Create()
if err != nil {
cos.Close(res.R)
goi._cleanup(revert, nil, nil, nil, err, "(fcreate)")
Expand Down Expand Up @@ -88,7 +88,7 @@ func (goi *getOI) coldReopen(res *core.GetReaderResult) error {
}

// reopen & transmit ---
if lmfh, err = lom.OpenFile(); err != nil {
if lmfh, err = lom.Open(); err != nil {
goi._cleanup(revert, nil, buf, slab, err, "(seek)")
return err
}
Expand Down Expand Up @@ -176,9 +176,9 @@ func (goi *getOI) _cleanup(revert string, lmfh *os.File, buf []byte, slab *memsy
slab.Free(buf)
}
if err != nil {
goi.lom.Remove()
goi.lom.RemoveObj()
if revert != "" {
if errV := os.Rename(revert, goi.lom.FQN); errV != nil {
if errV := goi.lom.RenameToMain(revert); errV != nil {
nlog.Infoln(ftcg+tag+"(revert)", errV)
}
}
Expand All @@ -197,12 +197,12 @@ func (goi *getOI) coldStream(res *core.GetReaderResult) error {
)
if goi.verchanged {
revert = fs.CSM.Gen(lom, fs.WorkfileType, fs.WorkfileColdget)
if err := os.Rename(lom.FQN, revert); err != nil {
if err := lom.RenameMainTo(revert); err != nil {
nlog.Errorln("failed to rename prev. version - proceeding anyway", lom.FQN, "=>", revert)
revert = ""
}
}
lmfh, err := lom.CreateFile(lom.FQN)
lmfh, err := lom.Create()
if err != nil {
cos.Close(res.R)
goi._cleanup(revert, nil, nil, nil, err, "(fcreate)")
Expand Down
27 changes: 13 additions & 14 deletions ais/tgtobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (poi *putOI) fini() (ecode int, err error) {
// (see GetCold() implementation and cmn.OWT enum)
switch poi.owt {
case cmn.OwtGetTryLock, cmn.OwtGetLock, cmn.OwtGet:
debug.AssertFunc(func() bool { _, exclusive := lom.IsLocked(); return exclusive })
// do nothing: lom is already wlocked
case cmn.OwtGetPrefetchLock:
if !lom.TryLock(true) {
if cmn.Rom.FastV(4, cos.SmoduleAIS) {
Expand Down Expand Up @@ -334,7 +334,7 @@ func (poi *putOI) fini() (ecode int, err error) {
}

// done
if err = lom.RenameFrom(poi.workFQN); err != nil {
if err = lom.RenameFinalize(poi.workFQN); err != nil {
return
}
if lom.HasCopies() {
Expand Down Expand Up @@ -385,7 +385,7 @@ func (poi *putOI) write() (buf []byte, slab *memsys.Slab, lmfh *os.File, err err
}{}
ckconf = poi.lom.CksumConf()
)
if lmfh, err = poi.lom.CreateFile(poi.workFQN); err != nil {
if lmfh, err = poi.lom.CreateWork(poi.workFQN); err != nil {
return
}
if poi.size <= 0 {
Expand Down Expand Up @@ -772,15 +772,15 @@ validate:
//
// TODO: mark `deleted` and postpone actual deletion
//
if erl := lom.Remove(true /*force through rlock*/); erl != nil {
if erl := lom.RemoveObj(true /*force through rlock*/); erl != nil {
nlog.Warningf("%s: failed to remove corrupted %s, err: %v", goi.t, lom, erl)
}
return
}
//
// try to recover from BAD CHECKSUM
//
cos.RemoveFile(lom.FQN) // TODO: ditto
lom.RemoveMain()

if lom.HasCopies() {
retried = true
Expand All @@ -797,7 +797,7 @@ validate:
if lom.ECEnabled() {
retried = true
goi.lom.Unlock(false)
cos.RemoveFile(lom.FQN)
lom.RemoveMain()
_, code, err = goi.restoreFromAny(true /*skipLomRestore*/)
goi.lom.Lock(false)
if err == nil {
Expand All @@ -808,7 +808,7 @@ validate:
}

// TODO: ditto
if erl := lom.Remove(true /*force through rlock*/); erl != nil {
if erl := lom.RemoveObj(true /*force through rlock*/); erl != nil {
nlog.Warningf("%s: failed to remove corrupted %s, err: %v", goi.t, lom, erl)
}
return
Expand Down Expand Up @@ -1255,7 +1255,7 @@ func (a *apndOI) apnd(buf []byte) (packedHdl string, ecode int, err error) {
} else {
a.lom.Unlock(false)
a.hdl.partialCksum = cos.NewCksumHash(a.lom.CksumType())
fh, err = a.lom.CreateFile(workFQN)
fh, err = a.lom.CreateWork(workFQN)
}
} else {
fh, err = os.OpenFile(workFQN, os.O_APPEND|os.O_WRONLY, cos.PermRWR)
Expand Down Expand Up @@ -1655,12 +1655,12 @@ func (a *putA2I) do() (int, error) {
tarFormat tar.Format
workFQN = fs.CSM.Gen(a.lom, fs.WorkfileType, fs.WorkfileAppendToArch)
)
if err = os.Rename(a.lom.FQN, workFQN); err != nil {
if err = a.lom.RenameMainTo(workFQN); err != nil {
return http.StatusInternalServerError, err
}
fh, tarFormat, err = archive.OpenTarSeekEnd(a.lom.Cname(), workFQN)
if err != nil {
if errV := a.lom.RenameFrom(workFQN); errV != nil {
if errV := a.lom.RenameToMain(workFQN); errV != nil {
return http.StatusInternalServerError, errV
}
if err == archive.ErrTarIsEmpty {
Expand All @@ -1675,8 +1675,7 @@ func (a *putA2I) do() (int, error) {
if err = a.finalize(size, cos.NoneCksum, workFQN); err == nil {
return http.StatusInternalServerError, nil // ok
}
}
if errV := a.lom.RenameFrom(workFQN); errV != nil {
} else if errV := a.lom.RenameToMain(workFQN); errV != nil {
nlog.Errorf(fmtNested, a.t, err, "append and rename back", workFQN, errV)
}
return http.StatusInternalServerError, err
Expand Down Expand Up @@ -1705,7 +1704,7 @@ cpap: // copy + append
aw.Fini()
} else {
// copy + append
lmfh, err = a.lom.OpenFile()
lmfh, err = a.lom.Open()
if err != nil {
cos.Close(wfh)
return http.StatusNotFound, err
Expand Down Expand Up @@ -1771,7 +1770,7 @@ func (a *putA2I) finalize(size int64, cksum *cos.Cksum, fqn string) error {
debug.Assertf(finfo.Size() == size, "%d != %d", finfo.Size(), size)
})
// done
if err := a.lom.RenameFrom(fqn); err != nil {
if err := a.lom.RenameFinalize(fqn); err != nil {
return err
}
a.lom.SetSize(size)
Expand Down
10 changes: 5 additions & 5 deletions ais/tgtobj_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func BenchmarkObjPut(b *testing.B) {
workFQN: path.Join(testMountpath, "objname.work"),
config: cmn.GCO.Get(),
}
os.Remove(lom.FQN)
lom.RemoveMain()
b.StartTimer()

_, err := poi.putObject()
Expand All @@ -132,7 +132,7 @@ func BenchmarkObjPut(b *testing.B) {
}
}
b.StopTimer()
os.Remove(lom.FQN)
lom.RemoveMain()
})
}
}
Expand Down Expand Up @@ -173,7 +173,7 @@ func BenchmarkObjAppend(b *testing.B) {
op: apc.AppendOp,
hdl: hdl,
}
os.Remove(lom.FQN)
lom.RemoveMain()
b.StartTimer()

newHandle, _, err := aoi.apnd(buf)
Expand All @@ -186,7 +186,7 @@ func BenchmarkObjAppend(b *testing.B) {
}
}
b.StopTimer()
os.Remove(lom.FQN)
lom.RemoveMain()
os.Remove(hdl.workFQN)
})
}
Expand Down Expand Up @@ -261,7 +261,7 @@ func BenchmarkObjGetDiscard(b *testing.B) {
}

b.StopTimer()
os.Remove(lom.FQN)
lom.RemoveMain()
})
}
}
6 changes: 3 additions & 3 deletions ais/tgts3mpt.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (t *target) putMptPart(w http.ResponseWriter, r *http.Request, items []stri
// workfile name format: <upload-id>.<part-number>.<obj-name>
prefix := uploadID + "." + strconv.FormatInt(int64(partNum), 10)
wfqn := fs.CSM.Gen(lom, fs.WorkfileType, prefix)
partFh, errC := lom.CreateFileRW(wfqn)
partFh, errC := lom.CreateWorkRW(wfqn)
if errC != nil {
s3.WriteMptErr(w, r, errC, 0, lom, uploadID)
return
Expand Down Expand Up @@ -274,7 +274,7 @@ func (t *target) completeMpt(w http.ResponseWriter, r *http.Request, items []str
// 2. <upload-id>.complete.<obj-name>
prefix := uploadID + ".complete"
wfqn := fs.CSM.Gen(lom, fs.WorkfileType, prefix)
wfh, errC := lom.CreateFile(wfqn)
wfh, errC := lom.CreateWork(wfqn)
if errC != nil {
s3.WriteMptErr(w, r, errC, 0, lom, uploadID)
return
Expand Down Expand Up @@ -493,7 +493,7 @@ func (t *target) getMptPart(w http.ResponseWriter, r *http.Request, bck *meta.Bc
if err != nil {
s3.WriteErr(w, r, err, status)
}
fh, err := lom.OpenFile()
fh, err := lom.Open()
if err != nil {
s3.WriteErr(w, r, err, 0)
return
Expand Down
16 changes: 6 additions & 10 deletions core/lcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,10 @@ func (lom *LOM) HasCopies() bool { return len(lom.md.copies) > 1 }
func (lom *LOM) NumCopies() int { return max(len(lom.md.copies), 1) } // metadata-wise

// GetCopies returns all copies
// NOTE: a) copies include lom.FQN aka "main repl.", and b) caller must take a lock
// - copies include lom.FQN aka "main repl."
// - caller must take a lock
func (lom *LOM) GetCopies() fs.MPI {
debug.AssertFunc(func() bool {
rc, exclusive := lom.IsLocked()
return exclusive || rc > 0
})
debug.Assert(lom.isLockedRW(), lom.Cname())
return lom.md.copies
}

Expand Down Expand Up @@ -145,11 +143,9 @@ func (lom *LOM) syncMetaWithCopies() (err error) {
if !lom.HasCopies() {
return nil
}
// NOTE: caller is responsible for write-locking
debug.AssertFunc(func() bool {
_, exclusive := lom.IsLocked()
return exclusive
})
// caller is responsible for write-locking
debug.Assert(lom.isLockedExcl(), lom.Cname())

if !lom.WritePolicy().IsImmediate() {
lom.md.makeDirty()
return nil
Expand Down
10 changes: 4 additions & 6 deletions core/ldp.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (lom *LOM) CheckRemoteMD(locked, sync bool, origReq *http.Request) (res CRM

// rm remotely-deleted
if cos.IsNotExist(err, ecode) && (lom.VersionConf().Sync || sync) {
errDel := lom.Remove(locked /*force through rlock*/)
errDel := lom.RemoveObj(locked /*force through rlock*/)
if errDel != nil {
ecode, err = 0, errDel
} else {
Expand All @@ -155,12 +155,10 @@ func (lom *LOM) CheckRemoteMD(locked, sync bool, origReq *http.Request) (res CRM
return CRMD{ErrCode: ecode, Err: err}
}

// NOTE: must be locked; NOTE: Sync == false (ie., not deleting)
// NOTE: Sync is false (ie., not deleting)
func (lom *LOM) LoadLatest(latest bool) (oa *cmn.ObjAttrs, deleted bool, err error) {
debug.AssertFunc(func() bool {
rc, exclusive := lom.IsLocked()
return exclusive || rc > 0
})
debug.Assert(lom.isLockedRW(), lom.Cname()) // caller must take a lock

err = lom.Load(true /*cache it*/, true /*locked*/)
if err != nil {
if !cmn.IsErrObjNought(err) || !latest {
Expand Down
Loading

0 comments on commit f882ef4

Please sign in to comment.