Skip to content

Commit

Permalink
append to TAR: remove redundant fseek, simplify
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jun 22, 2024
1 parent 887bb05 commit 51a2273
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 36 deletions.
8 changes: 5 additions & 3 deletions ais/tgtobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -1651,14 +1651,15 @@ func (a *putA2I) do() (int, error) {
var (
err error
fh *os.File
offset int64
size int64
tarFormat tar.Format
workFQN = fs.CSM.Gen(a.lom, fs.WorkfileType, fs.WorkfileAppendToArch)
)
if err = a.lom.RenameMainTo(workFQN); err != nil {
return http.StatusInternalServerError, err
}
fh, tarFormat, err = archive.OpenTarSeekEnd(a.lom.Cname(), workFQN)
fh, tarFormat, offset, err = archive.OpenTarSeekEnd(a.lom.Cname(), workFQN)
if err != nil {
if errV := a.lom.RenameToMain(workFQN); errV != nil {
return http.StatusInternalServerError, errV
Expand All @@ -1670,7 +1671,7 @@ func (a *putA2I) do() (int, error) {
return http.StatusInternalServerError, err
}
// do - fast
if size, err = a.fast(fh, tarFormat); err == nil {
if size, err = a.fast(fh, tarFormat, offset); err == nil {
// NOTE: checksum traded off
if err = a.finalize(size, cos.NoneCksum, workFQN); err == nil {
return http.StatusInternalServerError, nil // ok
Expand Down Expand Up @@ -1732,7 +1733,7 @@ cpap: // copy + append
}

// TAR only - fast & direct
func (a *putA2I) fast(rwfh *os.File, tarFormat tar.Format) (size int64, err error) {
func (a *putA2I) fast(rwfh *os.File, tarFormat tar.Format, offset int64) (size int64, err error) {
var (
buf, slab = a.t.gmm.AllocSize(a.size)
tw = tar.NewWriter(rwfh)
Expand All @@ -1750,6 +1751,7 @@ func (a *putA2I) fast(rwfh *os.File, tarFormat tar.Format) (size int64, err erro
cos.Close(tw)
if err == nil {
size, err = rwfh.Seek(0, io.SeekCurrent)
debug.Assert(err != nil || size > offset, size, " vs ", offset)
}
slab.Free(buf)
cos.Close(rwfh)
Expand Down
25 changes: 15 additions & 10 deletions cmn/archive/fast.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,33 @@ import (
// The blocks must be overwritten, otherwise newly added files won't be
// accessible. Different TAR formats (such as `ustar`, `pax` and `GNU`)
// write different number of zero blocks.
func OpenTarSeekEnd(cname, workFQN string) (rwfh *os.File, tarFormat tar.Format, err error) {
func OpenTarSeekEnd(cname, workFQN string) (rwfh *os.File, tarFormat tar.Format, offset int64, err error) {
if rwfh, err = os.OpenFile(workFQN, os.O_RDWR, cos.PermRWR); err != nil {
return
}
if tarFormat, err = _seekTarEnd(cname, rwfh); err != nil {
if tarFormat, offset, err = _seekTarEnd(cname, rwfh); err != nil {
rwfh.Close() // always close on err
}
return
}

func _seekTarEnd(cname string, fh *os.File) (tarFormat tar.Format, _ error) {
func _seekTarEnd(cname string, fh *os.File) (tarFormat tar.Format, offset int64, err error) {
var (
twr = tar.NewReader(fh)
size int64
pos = int64(-1)
unknown bool
)
for {
hdr, err := twr.Next()
var hdr *tar.Header
hdr, err = twr.Next()
if err != nil {
if err != io.EOF {
return tarFormat, err // invalid TAR format
return tarFormat, 0, err // invalid TAR format
}
// EOF
if pos < 0 {
return tarFormat, ErrTarIsEmpty
return tarFormat, 0, ErrTarIsEmpty // still ok
}
break
}
Expand All @@ -65,14 +66,18 @@ func _seekTarEnd(cname string, fh *os.File) (tarFormat tar.Format, _ error) {
pos, err = fh.Seek(0, io.SeekCurrent)
if err != nil {
debug.AssertNoErr(err) // unlikely
return tarFormat, err
return tarFormat, -1, err
}
size = hdr.Size
}
if pos == 0 {
return tarFormat, fmt.Errorf("failed to seek end of the TAR %s", cname)
return tarFormat, 0, fmt.Errorf("failed to seek end of the TAR %s", cname)
}

padded := cos.CeilAlignInt64(size, TarBlockSize)
_, err := fh.Seek(pos+padded, io.SeekStart)
return tarFormat, err
offset, err = fh.Seek(pos+padded, io.SeekStart)
debug.AssertNoErr(err)
debug.Assert(offset > 0)

return tarFormat, offset, err
}
42 changes: 19 additions & 23 deletions xact/xs/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,15 @@ func (r *XactArch) Begin(msg *cmn.ArchiveBckMsg, archlom *core.LOM) (err error)
// here and elsewhere: an extra check to make sure this target is active (ref: ignoreMaintenance)
smap := core.T.Sowner().Get()
if err = core.InMaintOrDecomm(smap, core.T.Snode(), r); err != nil {
return
return err
}
nat := smap.CountActiveTs()
wi.refc.Store(int32(nat - 1))

wi.tsi, err = smap.HrwName2T(msg.ToBck.MakeUname(msg.ArchName))
if err != nil {
r.AddErr(err, 4, cos.SmoduleXs)
return
return err
}

// fcreate at BEGIN time
Expand All @@ -159,7 +159,7 @@ func (r *XactArch) Begin(msg *cmn.ArchiveBckMsg, archlom *core.LOM) (err error)
wi.wfh, err = wi.archlom.CreateWork(wi.fqn)
}
if err != nil {
return
return err
}
if cmn.Rom.FastV(5, cos.SmoduleXs) {
nlog.Infof("%s: begin%s %s", r.Base.Name(), s, msg.Cname())
Expand All @@ -175,7 +175,7 @@ func (r *XactArch) Begin(msg *cmn.ArchiveBckMsg, archlom *core.LOM) (err error)
if err != nil {
wi.writer.Fini()
wi.cleanup()
return
return err
}
}
}
Expand All @@ -191,7 +191,7 @@ func (r *XactArch) Begin(msg *cmn.ArchiveBckMsg, archlom *core.LOM) (err error)
r.pending.m[msg.TxnUUID] = wi
r.wiCnt.Inc()
r.pending.Unlock()
return
return nil
}

func (r *XactArch) Do(msg *cmn.ArchiveBckMsg) {
Expand Down Expand Up @@ -434,47 +434,43 @@ func (r *XactArch) Snap() (snap *core.Snap) {
func (wi *archwi) beginAppend() (lmfh cos.LomReader, err error) {
msg := wi.msg
if msg.Mime == archive.ExtTar {
if err = wi.openTarForAppend(); err == nil || err != archive.ErrTarIsEmpty {
return
err = wi.openTarForAppend()
if err == nil /*can append*/ || err != archive.ErrTarIsEmpty /*fail Begin*/ {
return nil, err
}
}
// msg.Mime has been already validated (see ais/* for apc.ActArchive)

// prep to copy `lmfh` --> `wi.fh` with subsequent APPEND-ing
// msg.Mime has been already validated (see ais/* for apc.ActArchive)
lmfh, err = wi.archlom.Open()
if err != nil {
return
return nil, err
}
if wi.wfh, err = wi.archlom.CreateWork(wi.fqn); err != nil {
cos.Close(lmfh)
lmfh = nil
}
return
return lmfh, err
}

func (wi *archwi) openTarForAppend() (err error) {
if err = wi.archlom.RenameMainTo(wi.fqn); err != nil {
return
return err
}
// open (rw) lom itself
wi.wfh, wi.tarFormat, err = archive.OpenTarSeekEnd(wi.archlom.ObjName, wi.fqn)
if err != nil {
goto roll
}
wi.appendPos, err = wi.wfh.Seek(0, io.SeekCurrent)
wi.wfh, wi.tarFormat, wi.appendPos, err = archive.OpenTarSeekEnd(wi.archlom.Cname(), wi.fqn)
if err == nil {
return // can append
return nil // can append
}
wi.appendPos, wi.tarFormat = 0, tar.FormatUnknown // reset
cos.Close(wi.wfh)
wi.wfh = nil
roll:

// back
if errV := wi.archlom.RenameToMain(wi.fqn); errV != nil {
nlog.Errorf("%s: nested error: failed to append %s (%v) and rename back from %s (%v)",
wi.tsi, wi.archlom, err, wi.fqn, errV)
} else {
wi.fqn = ""
}
return
return err
}

// multi-object iterator i/f: "handle work item"
Expand Down Expand Up @@ -564,7 +560,7 @@ func (wi *archwi) finalize() (int64, error) {
if err != nil {
return 0, err
}
debug.Assertf(size > wi.appendPos, "%d vs %d", size, wi.appendPos)
debug.Assert(size > wi.appendPos, size, " vs ", wi.appendPos)
// checksum traded off
wi.archlom.SetCksum(cos.NewCksum(cos.ChecksumNone, ""))
return size, nil
Expand Down

0 comments on commit 51a2273

Please sign in to comment.