Skip to content

Commit

Permalink
core: return object reader, not file
Browse files Browse the repository at this point in the history
* `cos.LomReader`

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jun 21, 2024
1 parent f882ef4 commit 3816bfb
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 28 deletions.
33 changes: 19 additions & 14 deletions ais/tgtfcold.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,21 @@ const ftcg = "Warning: failed to cold-GET"

func (goi *getOI) coldReopen(res *core.GetReaderResult) error {
var (
t, lom = goi.t, goi.lom
err error
lmfh cos.LomReader
wfh *os.File
t = goi.t
lom = goi.lom
revert string
)
if goi.verchanged {
revert = fs.CSM.Gen(lom, fs.WorkfileType, fs.WorkfileColdget)
if err := lom.RenameMainTo(revert); err != nil {
nlog.Errorln("failed to rename prev. version - proceeding anyway", lom.FQN, "=>", revert)
if errV := lom.RenameMainTo(revert); errV != nil {
nlog.Errorln("failed to rename prev. version - proceeding anyway", lom.Cname(), "=>", revert)
revert = ""
}
}
lmfh, err := lom.Create()
wfh, err = lom.Create()
if err != nil {
cos.Close(res.R)
goi._cleanup(revert, nil, nil, nil, err, "(fcreate)")
Expand All @@ -48,28 +52,28 @@ func (goi *getOI) coldReopen(res *core.GetReaderResult) error {
written int64
buf, slab = t.gmm.AllocSize(min(res.Size, memsys.DefaultBuf2Size))
cksumH = cos.NewCksumHash(lom.CksumConf().Type)
mw = cos.NewWriterMulti(lmfh, cksumH.H)
mw = cos.NewWriterMulti(wfh, cksumH.H)
)
written, err = cos.CopyBuffer(mw, res.R, buf)
cos.Close(res.R)

if err != nil {
goi._cleanup(revert, lmfh, buf, slab, err, "(rr/wl)")
goi._cleanup(revert, wfh, buf, slab, err, "(rr/wl)")
return err
}
debug.Assertf(written == res.Size, "%s: remote-size %d != %d written", lom.Cname(), res.Size, written)

if lom.IsFeatureSet(feat.FsyncPUT) {
// fsync (flush)
if err = lmfh.Sync(); err != nil {
goi._cleanup(revert, lmfh, buf, slab, err, "(fsync)")
if err = wfh.Sync(); err != nil {
goi._cleanup(revert, wfh, buf, slab, err, "(fsync)")
return err
}
}
err = lmfh.Close()
lmfh = nil
err = wfh.Close()
wfh = nil
if err != nil {
goi._cleanup(revert, lmfh, buf, slab, err, "(fclose)")
goi._cleanup(revert, wfh, buf, slab, err, "(fclose)")
return err
}

Expand All @@ -83,12 +87,13 @@ func (goi *getOI) coldReopen(res *core.GetReaderResult) error {
}
}
if err = lom.PersistMain(); err != nil {
goi._cleanup(revert, lmfh, buf, slab, err, "(persist)")
goi._cleanup(revert, wfh, buf, slab, err, "(persist)")
return err
}

// reopen & transmit ---
if lmfh, err = lom.Open(); err != nil {
lmfh, err = lom.Open()
if err != nil {
goi._cleanup(revert, nil, buf, slab, err, "(seek)")
return err
}
Expand Down Expand Up @@ -168,7 +173,7 @@ func (goi *getOI) _fini(revert string, fullSize, txSize int64) error {
return nil
}

func (goi *getOI) _cleanup(revert string, lmfh *os.File, buf []byte, slab *memsys.Slab, err error, tag string) {
func (goi *getOI) _cleanup(revert string, lmfh io.Closer, buf []byte, slab *memsys.Slab, err error, tag string) {
if lmfh != nil {
lmfh.Close()
}
Expand Down
11 changes: 6 additions & 5 deletions ais/tgtobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -1683,11 +1683,12 @@ func (a *putA2I) do() (int, error) {

cpap: // copy + append
var (
err error
lmfh, wfh *os.File
workFQN string
cksum cos.CksumHashSize
aw archive.Writer
err error
wfh *os.File
lmfh cos.LomReader
workFQN string
cksum cos.CksumHashSize
aw archive.Writer
)
workFQN = fs.CSM.Gen(a.lom, fs.WorkfileType, fs.WorkfileAppendToArch)
wfh, err = os.OpenFile(workFQN, os.O_CREATE|os.O_WRONLY, cos.PermRWR)
Expand Down
13 changes: 9 additions & 4 deletions cmn/cos/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,21 @@ type (
size int64
}

ReadReaderAt interface {
io.Reader
io.ReaderAt
}
LomReader interface {
io.ReadCloser
io.ReaderAt
}

// implementations

nopReader struct {
size int
offset int
}
ReadReaderAt interface {
io.Reader
io.ReaderAt
}
deferRCS struct {
ReadCloseSizer
cb func()
Expand Down
3 changes: 1 addition & 2 deletions core/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"context"
"io"
"net/http"
"os"

"github.com/NVIDIA/aistore/api/apc"
"github.com/NVIDIA/aistore/cmn"
Expand All @@ -27,7 +26,7 @@ type (
}
LsoInvCtx struct {
Lom *LOM
Lmfh *os.File
Lmfh cos.LomReader
Name string
ID string
Schema []string
Expand Down
2 changes: 1 addition & 1 deletion core/lfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
// open
//

func (lom *LOM) Open() (*os.File, error) {
func (lom *LOM) Open() (cos.LomReader, error) {
return os.Open(lom.FQN)
}

Expand Down
4 changes: 2 additions & 2 deletions xact/xs/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (r *XactArch) Begin(msg *cmn.ArchiveBckMsg, archlom *core.LOM) (err error)
if core.T.SID() == wi.tsi.ID() {
var (
s string
lmfh *os.File
lmfh cos.LomReader
finfo, errX = os.Stat(wi.archlom.FQN)
exists = errX == nil
)
Expand Down Expand Up @@ -431,7 +431,7 @@ func (r *XactArch) Snap() (snap *core.Snap) {
// archwi //
////////////

func (wi *archwi) beginAppend() (lmfh *os.File, err error) {
func (wi *archwi) beginAppend() (lmfh cos.LomReader, err error) {
msg := wi.msg
if msg.Mime == archive.ExtTar {
if err = wi.openTarForAppend(); err == nil || err != archive.ErrTarIsEmpty {
Expand Down

0 comments on commit 3816bfb

Please sign in to comment.