Skip to content

Commit

Permalink
core: return writer, not file; add create-part, create-slice
Browse files Browse the repository at this point in the history
* part one, related commit: 3816bfb

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jun 22, 2024
1 parent cdbb828 commit 887bb05
Show file tree
Hide file tree
Showing 13 changed files with 57 additions and 52 deletions.
3 changes: 1 addition & 2 deletions ais/tgtfcold.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package ais

import (
"io"
"os"

"github.com/NVIDIA/aistore/ais/s3"
"github.com/NVIDIA/aistore/cmn"
Expand All @@ -28,7 +27,7 @@ func (goi *getOI) coldReopen(res *core.GetReaderResult) error {
var (
err error
lmfh cos.LomReader
wfh *os.File
wfh cos.LomWriter
t = goi.t
lom = goi.lom
revert string
Expand Down
4 changes: 2 additions & 2 deletions ais/tgtobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -1251,14 +1251,14 @@ func (a *apndOI) apnd(buf []byte) (packedHdl string, ecode int, err error) {
ecode = http.StatusInternalServerError
return
}
fh, err = os.OpenFile(workFQN, os.O_APPEND|os.O_WRONLY, cos.PermRWR)
fh, err = os.OpenFile(workFQN, os.O_APPEND|os.O_WRONLY, cos.PermRWR) // O_APPEND
} else {
a.lom.Unlock(false)
a.hdl.partialCksum = cos.NewCksumHash(a.lom.CksumType())
fh, err = a.lom.CreateWork(workFQN)
}
} else {
fh, err = os.OpenFile(workFQN, os.O_APPEND|os.O_WRONLY, cos.PermRWR)
fh, err = os.OpenFile(workFQN, os.O_APPEND|os.O_WRONLY, cos.PermRWR) // O_APPEND
debug.Assert(a.hdl.partialCksum != nil)
}
if err != nil { // failed to open or create
Expand Down
2 changes: 1 addition & 1 deletion ais/tgts3mpt.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (t *target) putMptPart(w http.ResponseWriter, r *http.Request, items []stri
// workfile name format: <upload-id>.<part-number>.<obj-name>
prefix := uploadID + "." + strconv.FormatInt(int64(partNum), 10)
wfqn := fs.CSM.Gen(lom, fs.WorkfileType, prefix)
partFh, errC := lom.CreateWorkRW(wfqn)
partFh, errC := lom.CreatePart(wfqn)
if errC != nil {
s3.WriteMptErr(w, r, errC, 0, lom, uploadID)
return
Expand Down
32 changes: 20 additions & 12 deletions cmn/cos/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,32 +33,30 @@ const ContentLengthUnknown = -1

const PathSeparator = string(filepath.Separator)

// readers
// follows below:
// - readers: interfaces
// - readers: implementations
// - handles
// - writers

// readers: interfaces
type (
ReadOpenCloser interface {
io.ReadCloser
Open() (ReadOpenCloser, error)
}
// ReadSizer is the interface that adds Size method to io.Reader.
ReadSizer interface {
io.Reader
Size() int64
}
// ReadCloseSizer is the interface that adds Size method to io.ReadCloser.
ReadCloseSizer interface {
io.ReadCloser
Size() int64
}
// ReadOpenCloseSizer is the interface that adds Size method to ReadOpenCloser.
ReadOpenCloseSizer interface {
ReadOpenCloseSizer interface { // see sizedReader below
ReadOpenCloser
Size() int64
}
sizedReader struct {
io.Reader
size int64
}

ReadReaderAt interface {
io.Reader
io.ReaderAt
Expand All @@ -67,8 +65,18 @@ type (
io.ReadCloser
io.ReaderAt
}
LomWriter interface {
io.WriteCloser
Sync() error
}
)

// implementations
// readers: implementations
type (
sizedReader struct {
io.Reader
size int64
}

nopReader struct {
size int
Expand Down Expand Up @@ -100,7 +108,7 @@ type (
nopOpener struct{ io.ReadCloser }
)

// handles (and more readers)
// handles (and even more readers)
type (
FileHandle struct {
*os.File
Expand Down
20 changes: 9 additions & 11 deletions core/lfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,19 @@ func (lom *LOM) Open() (cos.LomReader, error) {
// create
//

func (lom *LOM) Create() (*os.File, error) {
func (lom *LOM) Create() (cos.LomWriter, error) {
debug.Assert(lom.isLockedExcl(), lom.Cname()) // caller must wlock
return lom._cf(lom.FQN, os.O_WRONLY)
return lom._cf(lom.FQN)
}

func (lom *LOM) CreateWork(wfqn string) (*os.File, error) {
return lom._cf(wfqn, os.O_WRONLY)
}
func (lom *LOM) CreateWork(wfqn string) (*os.File, error) { return lom._cf(wfqn) } // -> lom
func (lom *LOM) CreatePart(wfqn string) (*os.File, error) { return lom._cf(wfqn) } // TODO: differentiate
func (lom *LOM) CreateSlice(wfqn string) (*os.File, error) { return lom._cf(wfqn) } // TODO: ditto

func (lom *LOM) CreateWorkRW(wfqn string) (*os.File, error) {
return lom._cf(wfqn, os.O_RDWR)
}
const _openFlags = os.O_CREATE | os.O_WRONLY | os.O_TRUNC

func (lom *LOM) _cf(fqn string, mode int) (fh *os.File, err error) {
fh, err = os.OpenFile(fqn, os.O_CREATE|mode|os.O_TRUNC, cos.PermRWR)
func (lom *LOM) _cf(fqn string) (fh *os.File, err error) {
fh, err = os.OpenFile(fqn, _openFlags, cos.PermRWR)
if err == nil || !os.IsNotExist(err) {
return fh, err
}
Expand All @@ -54,7 +52,7 @@ func (lom *LOM) _cf(fqn string, mode int) (fh *os.File, err error) {
if err = cos.CreateDir(fdir); err != nil {
return nil, err
}
return os.OpenFile(fqn, os.O_CREATE|mode|os.O_TRUNC, cos.PermRWR)
return os.OpenFile(fqn, _openFlags, cos.PermRWR)
}

//
Expand Down
18 changes: 9 additions & 9 deletions core/lom_xattr.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (lom *LOM) lmfs(populate bool) (md *lmeta, err error) {
if !populate {
md = &lmeta{}
}
err = md.unmarshal(read)
err = md.unpack(read)
if err == nil {
_recomputeMdSize(size, mdSize)
} else {
Expand All @@ -165,7 +165,7 @@ func (lom *LOM) PersistMain() (err error) {
return
}
// write-immediate (default)
buf := lom.marshal()
buf := lom.pack()
if err = fs.SetXattr(lom.FQN, XattrLOM, buf); err != nil {
lom.Uncache()
T.FSHC(err, lom.FQN)
Expand Down Expand Up @@ -193,7 +193,7 @@ func (lom *LOM) Persist() (err error) {
return
}

buf := lom.marshal()
buf := lom.pack()
if err = fs.SetXattr(lom.FQN, XattrLOM, buf); err != nil {
lom.Uncache()
T.FSHC(err, lom.FQN)
Expand All @@ -211,7 +211,7 @@ func (lom *LOM) Persist() (err error) {
}

func (lom *LOM) persistMdOnCopies() (copyFQN string, err error) {
buf := lom.marshal()
buf := lom.pack()
// replicate across copies
for copyFQN = range lom.md.copies {
if copyFQN == lom.FQN {
Expand All @@ -237,7 +237,7 @@ func (lom *LOM) flushCold(md *lmeta, atime time.Time) {
if err := lom.syncMetaWithCopies(); err != nil {
return
}
buf := lom.marshal()
buf := lom.pack()
if err := fs.SetXattr(lom.FQN, XattrLOM, buf); err != nil {
T.FSHC(err, lom.FQN)
}
Expand All @@ -253,9 +253,9 @@ func (lom *LOM) flushAtime(atime time.Time) error {
return os.Chtimes(lom.FQN, atime, mtime)
}

func (lom *LOM) marshal() (buf []byte) {
func (lom *LOM) pack() (buf []byte) {
lmsize := g.maxLmeta.Load()
buf = lom.md.marshal(lmsize)
buf = lom.md.pack(lmsize)
size := int64(len(buf))
debug.Assert(size <= xattrMaxSize)
_recomputeMdSize(size, lmsize)
Expand Down Expand Up @@ -292,7 +292,7 @@ func (md *lmeta) poprt(saved []uint64) {
md.Atime, md.atimefs, md.lid = int64(saved[0]), saved[1], lomBID(saved[2])
}

func (md *lmeta) unmarshal(buf []byte) error {
func (md *lmeta) unpack(buf []byte) error {
const invalid = "invalid lmeta"
var (
payload string
Expand Down Expand Up @@ -404,7 +404,7 @@ func (md *lmeta) unmarshal(buf []byte) error {
return nil
}

func (md *lmeta) marshal(mdSize int64) (buf []byte) {
func (md *lmeta) pack(mdSize int64) (buf []byte) {
var (
b8 [cos.SizeofI64]byte
cksumType, cksumValue = md.Cksum.Get()
Expand Down
2 changes: 1 addition & 1 deletion ec/bckencodexact.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (r *XactBckEncode) afterECObj(lom *core.LOM, err error) {
if err == nil {
r.LomAdd(lom)
} else if err != errSkipped {
nlog.Errorf("Failed to erasure-code %s: %v", lom.Cname(), err)
nlog.Errorf("failed to erasure-code %s: %v", lom.Cname(), err)
}

r.wg.Done()
Expand Down
2 changes: 1 addition & 1 deletion ec/ec.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func Init() {
xreg.RegBckXact(&encFactory{})

if err := initManager(); err != nil {
cos.ExitLogf("Failed to init manager: %v", err)
cos.ExitLog("Failed to init manager:", err)
}
}

Expand Down
14 changes: 7 additions & 7 deletions ec/getjogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,18 +277,18 @@ func (c *getJogger) restoreReplicatedFromDisk(ctx *restoreCtx) error {

w, err := ctx.lom.CreateWork(tmpFQN)
if err != nil {
nlog.Errorf("Failed to create file: %v", err)
nlog.Errorf("failed to create file: %v", err)
break
}
iReqBuf := newIntraReq(reqGet, ctx.meta, ctx.lom.Bck()).NewPack(g.smm)
n, err = c.parent.readRemote(ctx.lom, node, uname, iReqBuf, w)
g.smm.Free(iReqBuf)

if err == nil && n != 0 {
// A valid replica is found - break and do close file handle
// found valid replica
err = cos.FlushClose(w)
if err != nil {
nlog.Errorf("Failed to flush and close: %v", err)
nlog.Errorf("failed to flush and close: %v", err)
break
}
ctx.lom.SetSize(n)
Expand Down Expand Up @@ -362,7 +362,7 @@ func (c *getJogger) requestSlices(ctx *restoreCtx) error {
if ctx.toDisk {
prefix := fmt.Sprintf("ec-restore-%d", v.SliceID)
fqn := fs.CSM.Gen(ctx.lom, fs.WorkfileType, prefix)
fh, err := ctx.lom.CreateWork(fqn)
fh, err := ctx.lom.CreateSlice(fqn)
if err != nil {
return err
}
Expand Down Expand Up @@ -420,7 +420,7 @@ func newSliceWriter(ctx *restoreCtx, writers []io.Writer, restored []*slice,
if ctx.toDisk {
prefix := fmt.Sprintf("ec-rebuild-%d", idx)
fqn := fs.CSM.Gen(ctx.lom, fs.WorkfileType, prefix)
file, err := ctx.lom.CreateWork(fqn)
file, err := ctx.lom.CreateSlice(fqn)
if err != nil {
return err
}
Expand Down Expand Up @@ -776,9 +776,9 @@ func (c *getJogger) restoreEncoded(ctx *restoreCtx) error {

// main replica is ready to download by a client.
if err := c.uploadRestoredSlices(ctx, restored); err != nil {
nlog.Errorf("Failed to upload restored slices of %s: %v", ctx.lom, err)
nlog.Errorf("failed to upload restored slices of %s: %v", ctx.lom, err)
} else if cmn.Rom.FastV(4, cos.SmoduleEC) {
nlog.Infof("Slices %s restored successfully", ctx.lom)
nlog.Infof("slices %s restored successfully", ctx.lom)
}

c.freeDownloaded(ctx)
Expand Down
2 changes: 1 addition & 1 deletion ec/getxaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func (r *XactGet) decode(req *request, lom *core.LOM) {
req.tm = time.Now()

if err := r.dispatchRequest(req, lom); err != nil {
nlog.Errorf("Failed to restore %s: %v", lom, err)
nlog.Errorf("failed to restore %s: %v", lom, err)
freeReq(req)
}
}
Expand Down
4 changes: 2 additions & 2 deletions ec/putjogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func generateSlicesToDisk(ctx *encodeCtx) error {
cksumType := ctx.lom.CksumType()
for i := range ctx.paritySlices {
workFQN := fs.CSM.Gen(ctx.lom, fs.WorkfileType, fmt.Sprintf("ec-write-%d", i))
writer, err := ctx.lom.CreateWork(workFQN)
writer, err := ctx.lom.CreateSlice(workFQN)
if err != nil {
return err
}
Expand Down Expand Up @@ -495,7 +495,7 @@ func (c *putJogger) sendSlice(ctx *encodeCtx, data *slice, node *meta.Snode, idx
data.release()
}
if err != nil {
nlog.Errorln("Failed to send", hdr.Cname()+": ", err)
nlog.Errorln("failed to send", hdr.Cname()+": ", err)
}
}

Expand Down
4 changes: 2 additions & 2 deletions ec/putxaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (r *XactPut) encode(req *request, lom *core.LOM) {
now := time.Now()
req.putTime, req.tm = now, now
if err := r.dispatchRequest(req, lom); err != nil {
nlog.Errorf("Failed to encode %s: %v", lom, err)
nlog.Errorf("failed to encode %s: %v", lom, err)
freeReq(req)
}
}
Expand All @@ -226,7 +226,7 @@ func (r *XactPut) cleanup(req *request, lom *core.LOM) {
req.putTime, req.tm = now, now

if err := r.dispatchRequest(req, lom); err != nil {
nlog.Errorf("Failed to cleanup %s: %v", lom, err)
nlog.Errorf("failed to cleanup %s: %v", lom, err)
freeReq(req)
}
}
Expand Down
2 changes: 1 addition & 1 deletion ec/xaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func newSliceResponse(md *Metadata, attrs *cmn.ObjAttrs, fqn string) (reader cos
attrs.Size = stat.Size()
reader, err = cos.NewFileHandle(fqn)
if err != nil {
nlog.Warningf("Failed to read file stats: %s", err)
nlog.Warningln("failed to read file stats:", err)
return nil, err
}
return reader, nil
Expand Down

0 comments on commit 887bb05

Please sign in to comment.