Skip to content

Commit

Permalink
carlog: Cope with partial offloads more correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Jan 27, 2024
1 parent 4fe9d3d commit cfccf5e
Showing 1 changed file with 92 additions and 80 deletions.
172 changes: 92 additions & 80 deletions carlog/carlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"math/bits"
"os"
"path/filepath"
"runtime/debug"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -277,46 +278,31 @@ func Open(staging CarStorageProvider, indexPath, dataPath string, tc TruncCleanu
}, nil
}

if h.External {
idx, err := OpenBSSTIndex(filepath.Join(indexPath, BsstIndexCanon))
// open data
dataFile, err := os.OpenFile(blkLogPath, os.O_RDWR|os.O_SYNC, 0666)
noDataFile := os.IsNotExist(err) && h.External && !h.Offloaded // External files may have data available only on external storage before being only available on Filecoin
var dataLen int64

if !noDataFile {
if err != nil {
return nil, xerrors.Errorf("opening bsst index: %w", err)
return nil, xerrors.Errorf("opening data: %w", err)
}

if staging == nil {
return nil, xerrors.Errorf("carlog: External but staging storage not set")
// check if data needs to be replayed/truncated
dataInfo, err := dataFile.Stat()
if err != nil {
return nil, xerrors.Errorf("stat data len: %w", err)
}

return &CarLog{
staging: staging,

head: headFile,

IndexPath: indexPath,
DataPath: dataPath,

eIdx: idx,
if dataInfo.Size() < h.RetiredAt {
// something is not yes

layerOffsets: h.LayerOffsets,
}, nil
}

// open data
dataFile, err := os.OpenFile(blkLogPath, os.O_RDWR|os.O_SYNC, 0666)
if err != nil {
return nil, xerrors.Errorf("opening data: %w", err)
}

// check if data needs to be replayed/truncated
dataInfo, err := dataFile.Stat()
if err != nil {
return nil, xerrors.Errorf("stat data len: %w", err)
}

if dataInfo.Size() < h.RetiredAt {
// something is not yes
return nil, xerrors.Errorf("data file is shorter than head says it should be (%d < %d)", dataInfo.Size(), h.RetiredAt)
}

return nil, xerrors.Errorf("data file is shorter than head says it should be (%d < %d)", dataInfo.Size(), h.RetiredAt)
dataLen = dataInfo.Size()
} else {
dataFile = nil // it's almost definitely nil already, but just to be sure
}

jb := &CarLog{
Expand All @@ -326,7 +312,7 @@ func Open(staging CarStorageProvider, indexPath, dataPath string, tc TruncCleanu
DataPath: dataPath,
head: headFile,
data: dataFile,
dataLen: dataInfo.Size(),
dataLen: dataLen,

dataStart: h.DataStart,
dataEnd: h.DataEnd,
Expand All @@ -336,67 +322,86 @@ func Open(staging CarStorageProvider, indexPath, dataPath string, tc TruncCleanu
}

// open index
if h.Finalized {
// bsst, read only
idx, err := OpenBSSTIndex(filepath.Join(indexPath, BsstIndex))
if h.External {
// External but not offloaded - we still have the data local, in need of commp

idx, err := OpenBSSTIndex(filepath.Join(indexPath, BsstIndexCanon))
if err != nil {
return nil, xerrors.Errorf("opening bsst index: %w", err)
}

jb.rIdx = idx
} else {
idx, err := OpenLevelDBIndex(filepath.Join(indexPath, LevelIndex), false)
if err != nil && !os.IsNotExist(err) {
return nil, xerrors.Errorf("opening leveldb index: %w", err)
if staging == nil {
return nil, xerrors.Errorf("carlog: External but staging storage not set")
}
if os.IsNotExist(err) {
log.Errorw("leveldb index missing, attempting to fix", "error", err, "path", filepath.Join(indexPath, LevelIndex))

idx, err = OpenLevelDBIndex(filepath.Join(indexPath, LevelIndex+".temp"), true)
jb.eIdx = idx
} else {
if h.Finalized {
// bsst, read only
idx, err := OpenBSSTIndex(filepath.Join(indexPath, BsstIndex))
if err != nil {
return nil, xerrors.Errorf("creating (fixLevelIndex) leveldb index: %w", err)
return nil, xerrors.Errorf("opening bsst index: %w", err)
}

err := jb.fixLevelIndex(h, idx)
if err != nil {
log.Errorw("fixing leveldb index failed", "error", err, "path", filepath.Join(indexPath, LevelIndex))
return nil, xerrors.Errorf("fixing leveldb index: %w", err)
jb.rIdx = idx
} else {
idx, err := OpenLevelDBIndex(filepath.Join(indexPath, LevelIndex), false)
if err != nil && !os.IsNotExist(err) {
return nil, xerrors.Errorf("opening leveldb index: %w", err)
}
if os.IsNotExist(err) {
log.Errorw("leveldb index missing, attempting to fix", "error", err, "path", filepath.Join(indexPath, LevelIndex))

if err := idx.Close(); err != nil {
log.Errorw("closing temp level index failed", "error", err, "path", filepath.Join(indexPath, LevelIndex))
return nil, xerrors.Errorf("closing temp level index: %w", err)
}
idx, err = OpenLevelDBIndex(filepath.Join(indexPath, LevelIndex+".temp"), true)
if err != nil {
return nil, xerrors.Errorf("creating (fixLevelIndex) leveldb index: %w", err)
}

if err := os.RemoveAll(filepath.Join(indexPath, LevelIndex)); err != nil {
log.Errorw("removing old level index failed", "error", err, "path", filepath.Join(indexPath, LevelIndex))
return nil, xerrors.Errorf("removing old level index: %w", err)
}
err := jb.fixLevelIndex(h, idx)
if err != nil {
log.Errorw("fixing leveldb index failed", "error", err, "path", filepath.Join(indexPath, LevelIndex))
return nil, xerrors.Errorf("fixing leveldb index: %w", err)
}

if err := os.Rename(filepath.Join(indexPath, LevelIndex+".temp"), filepath.Join(indexPath, LevelIndex)); err != nil {
log.Errorw("renaming temp level index failed", "error", err, "path", filepath.Join(indexPath, LevelIndex))
return nil, xerrors.Errorf("renaming temp level index: %w", err)
}
if err := idx.Close(); err != nil {
log.Errorw("closing temp level index failed", "error", err, "path", filepath.Join(indexPath, LevelIndex))
return nil, xerrors.Errorf("closing temp level index: %w", err)
}

idx, err = OpenLevelDBIndex(filepath.Join(indexPath, LevelIndex), false)
if err != nil {
log.Errorw("opening fixed leveldb index failed", "error", err, "path", filepath.Join(indexPath, LevelIndex))
return nil, xerrors.Errorf("opening fixed leveldb index: %w", err)
}
if err := os.RemoveAll(filepath.Join(indexPath, LevelIndex)); err != nil {
log.Errorw("removing old level index failed", "error", err, "path", filepath.Join(indexPath, LevelIndex))
return nil, xerrors.Errorf("removing old level index: %w", err)
}

log.Errorw("leveldb index fixed", "path", filepath.Join(indexPath, LevelIndex))
}
if err := os.Rename(filepath.Join(indexPath, LevelIndex+".temp"), filepath.Join(indexPath, LevelIndex)); err != nil {
log.Errorw("renaming temp level index failed", "error", err, "path", filepath.Join(indexPath, LevelIndex))
return nil, xerrors.Errorf("renaming temp level index: %w", err)
}

jb.rIdx = idx
idx, err = OpenLevelDBIndex(filepath.Join(indexPath, LevelIndex), false)
if err != nil {
log.Errorw("opening fixed leveldb index failed", "error", err, "path", filepath.Join(indexPath, LevelIndex))
return nil, xerrors.Errorf("opening fixed leveldb index: %w", err)
}

if h.ReadOnly {
// todo start finalize
// (this should happen through group mgr)
} else {
jb.wIdx = idx
log.Errorw("leveldb index fixed", "path", filepath.Join(indexPath, LevelIndex))
}

jb.rIdx = idx

if h.ReadOnly {
// todo start finalize
// (this should happen through group mgr)
} else {
jb.wIdx = idx
}
}
}

if noDataFile { // external, not fully offloaded, no local data file
return jb, nil
}

if h.DataEnd > 0 && len(h.LayerOffsets) == 0 {
if err := dataFile.Truncate(h.DataEnd); err != nil {
return nil, xerrors.Errorf("truncate data file (cut unfinished top tree): %w", err)
Expand All @@ -412,25 +417,26 @@ func Open(staging CarStorageProvider, indexPath, dataPath string, tc TruncCleanu
return nil, xerrors.Errorf("mutating head: %w", err)
}

dataInfo, err = dataFile.Stat()
dataInfo, err := dataFile.Stat()
if err != nil {
return nil, xerrors.Errorf("stat top-trunc data len: %w", err)
}
dataLen = dataInfo.Size()
}

// todo right now we're calling this on every startup when writable
// a better way to do that would be to have a track that we're in a "clean"
// state, or even to have two indexes, one for clean and one for dirty data
if dataInfo.Size() > h.RetiredAt || jb.wIdx != nil { // data ahead means there was an unclean shutdown during a write
if dataLen > h.RetiredAt || jb.wIdx != nil { // data ahead means there was an unclean shutdown during a write
if h.ReadOnly {
// If the file is truncated while being marked as read-only, something is terribly wrong.
return nil, xerrors.Errorf("read only(!) data file is shorter than head says it should be (%d < %d)", dataInfo.Size(), h.RetiredAt)
return nil, xerrors.Errorf("read only(!) data file is shorter than head says it should be (%d < %d)", dataLen, h.RetiredAt)
}

//return nil, xerrors.Errorf("data file is shorter than head says it should be (%d < %d)", dataInfo.Size(), h.RetiredAt)

// truncate index
err := jb.truncate(h.RetiredAt, dataInfo.Size(), tc)
err := jb.truncate(h.RetiredAt, dataLen, tc)
if err != nil {
return nil, xerrors.Errorf("truncating jbob: %w", err)
}
Expand Down Expand Up @@ -1544,7 +1550,7 @@ func (j *CarLog) WriteCar(w io.Writer) (int64, cid.Cid, error) {
// read root block, which is the only block in the last layer
rcid, node, err := carutil.ReadNode(layers[len(layers)-1].br)
if err != nil {
return 0, cid.Undef, xerrors.Errorf("reading root block: %w", err)
return 0, cid.Undef, xerrors.Errorf("reading root block (lo: %#v; rsat: %d, rsbase: %d; data: %v): %w", j.layerOffsets, layers[len(layers)-1].rs.(*readSeekerFromReaderAt).pos, layers[len(layers)-1].rs.(*readSeekerFromReaderAt).base, j.data, err)
}

// todo consider buffering the writes
Expand Down Expand Up @@ -1927,6 +1933,10 @@ type readSeekerFromReaderAt struct {
func (rs *readSeekerFromReaderAt) Read(p []byte) (n int, err error) {
n, err = rs.readerAt.ReadAt(p, rs.pos+rs.base)
rs.pos += int64(n)
if err != nil && err != io.EOF {
log.Errorw("READ SEEKER AT ERROR", "err", err, "plen", len(p), "pos", rs.pos, "base", rs.base, "n", n)
debug.PrintStack()
}
return n, err
}

Expand All @@ -1939,6 +1949,8 @@ func (rs *readSeekerFromReaderAt) Seek(offset int64, whence int) (int64, error)
case io.SeekEnd:
return 0, io.ErrUnexpectedEOF
default:
log.Errorw("READ SEEKER AT INVALID WHENCE", "whence", whence)
debug.PrintStack()
return 0, os.ErrInvalid
}

Expand Down

0 comments on commit cfccf5e

Please sign in to comment.