diff --git a/ais/tgtfcold.go b/ais/tgtfcold.go index 516707bc520..e2186f8950f 100644 --- a/ais/tgtfcold.go +++ b/ais/tgtfcold.go @@ -6,7 +6,6 @@ package ais import ( "io" - "os" "github.com/NVIDIA/aistore/ais/s3" "github.com/NVIDIA/aistore/cmn" @@ -28,7 +27,7 @@ func (goi *getOI) coldReopen(res *core.GetReaderResult) error { var ( err error lmfh cos.LomReader - wfh *os.File + wfh cos.LomWriter t = goi.t lom = goi.lom revert string diff --git a/ais/tgtobj.go b/ais/tgtobj.go index 239be3f8e6a..54946de4d02 100644 --- a/ais/tgtobj.go +++ b/ais/tgtobj.go @@ -1251,14 +1251,14 @@ func (a *apndOI) apnd(buf []byte) (packedHdl string, ecode int, err error) { ecode = http.StatusInternalServerError return } - fh, err = os.OpenFile(workFQN, os.O_APPEND|os.O_WRONLY, cos.PermRWR) + fh, err = os.OpenFile(workFQN, os.O_APPEND|os.O_WRONLY, cos.PermRWR) // O_APPEND } else { a.lom.Unlock(false) a.hdl.partialCksum = cos.NewCksumHash(a.lom.CksumType()) fh, err = a.lom.CreateWork(workFQN) } } else { - fh, err = os.OpenFile(workFQN, os.O_APPEND|os.O_WRONLY, cos.PermRWR) + fh, err = os.OpenFile(workFQN, os.O_APPEND|os.O_WRONLY, cos.PermRWR) // O_APPEND debug.Assert(a.hdl.partialCksum != nil) } if err != nil { // failed to open or create diff --git a/ais/tgts3mpt.go b/ais/tgts3mpt.go index 6fbcc00d9c5..70ff39e4848 100644 --- a/ais/tgts3mpt.go +++ b/ais/tgts3mpt.go @@ -122,7 +122,7 @@ func (t *target) putMptPart(w http.ResponseWriter, r *http.Request, items []stri // workfile name format: .. prefix := uploadID + "." + strconv.FormatInt(int64(partNum), 10) wfqn := fs.CSM.Gen(lom, fs.WorkfileType, prefix) - partFh, errC := lom.CreateWorkRW(wfqn) + partFh, errC := lom.CreatePart(wfqn) if errC != nil { s3.WriteMptErr(w, r, errC, 0, lom, uploadID) return diff --git a/cmn/cos/io.go b/cmn/cos/io.go index 9ddc84cca31..a59e4ad3ef3 100644 --- a/cmn/cos/io.go +++ b/cmn/cos/io.go @@ -33,32 +33,30 @@ const ContentLengthUnknown = -1 const PathSeparator = string(filepath.Separator) -// readers +// follows below: +// - readers: interfaces +// - readers: implementations +// - handles +// - writers + +// readers: interfaces type ( ReadOpenCloser interface { io.ReadCloser Open() (ReadOpenCloser, error) } - // ReadSizer is the interface that adds Size method to io.Reader. ReadSizer interface { io.Reader Size() int64 } - // ReadCloseSizer is the interface that adds Size method to io.ReadCloser. ReadCloseSizer interface { io.ReadCloser Size() int64 } - // ReadOpenCloseSizer is the interface that adds Size method to ReadOpenCloser. - ReadOpenCloseSizer interface { + ReadOpenCloseSizer interface { // see sizedReader below ReadOpenCloser Size() int64 } - sizedReader struct { - io.Reader - size int64 - } - ReadReaderAt interface { io.Reader io.ReaderAt @@ -67,8 +65,18 @@ type ( io.ReadCloser io.ReaderAt } + LomWriter interface { + io.WriteCloser + Sync() error + } +) - // implementations +// readers: implementations +type ( + sizedReader struct { + io.Reader + size int64 + } nopReader struct { size int @@ -100,7 +108,7 @@ type ( nopOpener struct{ io.ReadCloser } ) -// handles (and more readers) +// handles (and even more readers) type ( FileHandle struct { *os.File diff --git a/core/lfile.go b/core/lfile.go index 59ec30e341f..ec1adc3d7c9 100644 --- a/core/lfile.go +++ b/core/lfile.go @@ -26,21 +26,19 @@ func (lom *LOM) Open() (cos.LomReader, error) { // create // -func (lom *LOM) Create() (*os.File, error) { +func (lom *LOM) Create() (cos.LomWriter, error) { debug.Assert(lom.isLockedExcl(), lom.Cname()) // caller must wlock - return lom._cf(lom.FQN, os.O_WRONLY) + return lom._cf(lom.FQN) } -func (lom *LOM) CreateWork(wfqn string) (*os.File, error) { - return lom._cf(wfqn, os.O_WRONLY) -} +func (lom *LOM) CreateWork(wfqn string) (*os.File, error) { return lom._cf(wfqn) } // -> lom +func (lom *LOM) CreatePart(wfqn string) (*os.File, error) { return lom._cf(wfqn) } // TODO: differentiate +func (lom *LOM) CreateSlice(wfqn string) (*os.File, error) { return lom._cf(wfqn) } // TODO: ditto -func (lom *LOM) CreateWorkRW(wfqn string) (*os.File, error) { - return lom._cf(wfqn, os.O_RDWR) -} +const _openFlags = os.O_CREATE | os.O_WRONLY | os.O_TRUNC -func (lom *LOM) _cf(fqn string, mode int) (fh *os.File, err error) { - fh, err = os.OpenFile(fqn, os.O_CREATE|mode|os.O_TRUNC, cos.PermRWR) +func (lom *LOM) _cf(fqn string) (fh *os.File, err error) { + fh, err = os.OpenFile(fqn, _openFlags, cos.PermRWR) if err == nil || !os.IsNotExist(err) { return fh, err } @@ -54,7 +52,7 @@ func (lom *LOM) _cf(fqn string, mode int) (fh *os.File, err error) { if err = cos.CreateDir(fdir); err != nil { return nil, err } - return os.OpenFile(fqn, os.O_CREATE|mode|os.O_TRUNC, cos.PermRWR) + return os.OpenFile(fqn, _openFlags, cos.PermRWR) } // diff --git a/core/lom_xattr.go b/core/lom_xattr.go index 49a149ae9e6..f2a2d2bf7c4 100644 --- a/core/lom_xattr.go +++ b/core/lom_xattr.go @@ -146,7 +146,7 @@ func (lom *LOM) lmfs(populate bool) (md *lmeta, err error) { if !populate { md = &lmeta{} } - err = md.unmarshal(read) + err = md.unpack(read) if err == nil { _recomputeMdSize(size, mdSize) } else { @@ -165,7 +165,7 @@ func (lom *LOM) PersistMain() (err error) { return } // write-immediate (default) - buf := lom.marshal() + buf := lom.pack() if err = fs.SetXattr(lom.FQN, XattrLOM, buf); err != nil { lom.Uncache() T.FSHC(err, lom.FQN) @@ -193,7 +193,7 @@ func (lom *LOM) Persist() (err error) { return } - buf := lom.marshal() + buf := lom.pack() if err = fs.SetXattr(lom.FQN, XattrLOM, buf); err != nil { lom.Uncache() T.FSHC(err, lom.FQN) @@ -211,7 +211,7 @@ func (lom *LOM) Persist() (err error) { } func (lom *LOM) persistMdOnCopies() (copyFQN string, err error) { - buf := lom.marshal() + buf := lom.pack() // replicate across copies for copyFQN = range lom.md.copies { if copyFQN == lom.FQN { @@ -237,7 +237,7 @@ func (lom *LOM) flushCold(md *lmeta, atime time.Time) { if err := lom.syncMetaWithCopies(); err != nil { return } - buf := lom.marshal() + buf := lom.pack() if err := fs.SetXattr(lom.FQN, XattrLOM, buf); err != nil { T.FSHC(err, lom.FQN) } @@ -253,9 +253,9 @@ func (lom *LOM) flushAtime(atime time.Time) error { return os.Chtimes(lom.FQN, atime, mtime) } -func (lom *LOM) marshal() (buf []byte) { +func (lom *LOM) pack() (buf []byte) { lmsize := g.maxLmeta.Load() - buf = lom.md.marshal(lmsize) + buf = lom.md.pack(lmsize) size := int64(len(buf)) debug.Assert(size <= xattrMaxSize) _recomputeMdSize(size, lmsize) @@ -292,7 +292,7 @@ func (md *lmeta) poprt(saved []uint64) { md.Atime, md.atimefs, md.lid = int64(saved[0]), saved[1], lomBID(saved[2]) } -func (md *lmeta) unmarshal(buf []byte) error { +func (md *lmeta) unpack(buf []byte) error { const invalid = "invalid lmeta" var ( payload string @@ -404,7 +404,7 @@ func (md *lmeta) unmarshal(buf []byte) error { return nil } -func (md *lmeta) marshal(mdSize int64) (buf []byte) { +func (md *lmeta) pack(mdSize int64) (buf []byte) { var ( b8 [cos.SizeofI64]byte cksumType, cksumValue = md.Cksum.Get() diff --git a/ec/bckencodexact.go b/ec/bckencodexact.go index 7d19e805575..8e5e65120d0 100644 --- a/ec/bckencodexact.go +++ b/ec/bckencodexact.go @@ -123,7 +123,7 @@ func (r *XactBckEncode) afterECObj(lom *core.LOM, err error) { if err == nil { r.LomAdd(lom) } else if err != errSkipped { - nlog.Errorf("Failed to erasure-code %s: %v", lom.Cname(), err) + nlog.Errorf("failed to erasure-code %s: %v", lom.Cname(), err) } r.wg.Done() diff --git a/ec/ec.go b/ec/ec.go index c7b9b0d74d4..99bce7d7574 100644 --- a/ec/ec.go +++ b/ec/ec.go @@ -207,7 +207,7 @@ func Init() { xreg.RegBckXact(&encFactory{}) if err := initManager(); err != nil { - cos.ExitLogf("Failed to init manager: %v", err) + cos.ExitLog("Failed to init manager:", err) } } diff --git a/ec/getjogger.go b/ec/getjogger.go index 4e3e34367af..4f26dc904fb 100644 --- a/ec/getjogger.go +++ b/ec/getjogger.go @@ -277,7 +277,7 @@ func (c *getJogger) restoreReplicatedFromDisk(ctx *restoreCtx) error { w, err := ctx.lom.CreateWork(tmpFQN) if err != nil { - nlog.Errorf("Failed to create file: %v", err) + nlog.Errorf("failed to create file: %v", err) break } iReqBuf := newIntraReq(reqGet, ctx.meta, ctx.lom.Bck()).NewPack(g.smm) @@ -285,10 +285,10 @@ func (c *getJogger) restoreReplicatedFromDisk(ctx *restoreCtx) error { g.smm.Free(iReqBuf) if err == nil && n != 0 { - // A valid replica is found - break and do close file handle + // found valid replica err = cos.FlushClose(w) if err != nil { - nlog.Errorf("Failed to flush and close: %v", err) + nlog.Errorf("failed to flush and close: %v", err) break } ctx.lom.SetSize(n) @@ -362,7 +362,7 @@ func (c *getJogger) requestSlices(ctx *restoreCtx) error { if ctx.toDisk { prefix := fmt.Sprintf("ec-restore-%d", v.SliceID) fqn := fs.CSM.Gen(ctx.lom, fs.WorkfileType, prefix) - fh, err := ctx.lom.CreateWork(fqn) + fh, err := ctx.lom.CreateSlice(fqn) if err != nil { return err } @@ -420,7 +420,7 @@ func newSliceWriter(ctx *restoreCtx, writers []io.Writer, restored []*slice, if ctx.toDisk { prefix := fmt.Sprintf("ec-rebuild-%d", idx) fqn := fs.CSM.Gen(ctx.lom, fs.WorkfileType, prefix) - file, err := ctx.lom.CreateWork(fqn) + file, err := ctx.lom.CreateSlice(fqn) if err != nil { return err } @@ -776,9 +776,9 @@ func (c *getJogger) restoreEncoded(ctx *restoreCtx) error { // main replica is ready to download by a client. if err := c.uploadRestoredSlices(ctx, restored); err != nil { - nlog.Errorf("Failed to upload restored slices of %s: %v", ctx.lom, err) + nlog.Errorf("failed to upload restored slices of %s: %v", ctx.lom, err) } else if cmn.Rom.FastV(4, cos.SmoduleEC) { - nlog.Infof("Slices %s restored successfully", ctx.lom) + nlog.Infof("slices %s restored successfully", ctx.lom) } c.freeDownloaded(ctx) diff --git a/ec/getxaction.go b/ec/getxaction.go index 6bceef06b71..148cfb46f58 100644 --- a/ec/getxaction.go +++ b/ec/getxaction.go @@ -250,7 +250,7 @@ func (r *XactGet) decode(req *request, lom *core.LOM) { req.tm = time.Now() if err := r.dispatchRequest(req, lom); err != nil { - nlog.Errorf("Failed to restore %s: %v", lom, err) + nlog.Errorf("failed to restore %s: %v", lom, err) freeReq(req) } } diff --git a/ec/putjogger.go b/ec/putjogger.go index f9a435d26c1..996e5905713 100644 --- a/ec/putjogger.go +++ b/ec/putjogger.go @@ -448,7 +448,7 @@ func generateSlicesToDisk(ctx *encodeCtx) error { cksumType := ctx.lom.CksumType() for i := range ctx.paritySlices { workFQN := fs.CSM.Gen(ctx.lom, fs.WorkfileType, fmt.Sprintf("ec-write-%d", i)) - writer, err := ctx.lom.CreateWork(workFQN) + writer, err := ctx.lom.CreateSlice(workFQN) if err != nil { return err } @@ -495,7 +495,7 @@ func (c *putJogger) sendSlice(ctx *encodeCtx, data *slice, node *meta.Snode, idx data.release() } if err != nil { - nlog.Errorln("Failed to send", hdr.Cname()+": ", err) + nlog.Errorln("failed to send", hdr.Cname()+": ", err) } } diff --git a/ec/putxaction.go b/ec/putxaction.go index de4715d6516..3210d9ffa67 100644 --- a/ec/putxaction.go +++ b/ec/putxaction.go @@ -215,7 +215,7 @@ func (r *XactPut) encode(req *request, lom *core.LOM) { now := time.Now() req.putTime, req.tm = now, now if err := r.dispatchRequest(req, lom); err != nil { - nlog.Errorf("Failed to encode %s: %v", lom, err) + nlog.Errorf("failed to encode %s: %v", lom, err) freeReq(req) } } @@ -226,7 +226,7 @@ func (r *XactPut) cleanup(req *request, lom *core.LOM) { req.putTime, req.tm = now, now if err := r.dispatchRequest(req, lom); err != nil { - nlog.Errorf("Failed to cleanup %s: %v", lom, err) + nlog.Errorf("failed to cleanup %s: %v", lom, err) freeReq(req) } } diff --git a/ec/xaction.go b/ec/xaction.go index 4a9ea563b18..f1218bd0fca 100644 --- a/ec/xaction.go +++ b/ec/xaction.go @@ -119,7 +119,7 @@ func newSliceResponse(md *Metadata, attrs *cmn.ObjAttrs, fqn string) (reader cos attrs.Size = stat.Size() reader, err = cos.NewFileHandle(fqn) if err != nil { - nlog.Warningf("Failed to read file stats: %s", err) + nlog.Warningln("failed to read file stats:", err) return nil, err } return reader, nil