diff --git a/carlog/carlog.go b/carlog/carlog.go index edfe1c6..00c92d0 100644 --- a/carlog/carlog.go +++ b/carlog/carlog.go @@ -10,6 +10,7 @@ import ( "math/bits" "os" "path/filepath" + "runtime/debug" "sync" "sync/atomic" "time" @@ -277,46 +278,31 @@ func Open(staging CarStorageProvider, indexPath, dataPath string, tc TruncCleanu }, nil } - if h.External { - idx, err := OpenBSSTIndex(filepath.Join(indexPath, BsstIndexCanon)) + // open data + dataFile, err := os.OpenFile(blkLogPath, os.O_RDWR|os.O_SYNC, 0666) + noDataFile := os.IsNotExist(err) && h.External && !h.Offloaded // External files may have data available only on external storage before being only available on Filecoin + var dataLen int64 + + if !noDataFile { if err != nil { - return nil, xerrors.Errorf("opening bsst index: %w", err) + return nil, xerrors.Errorf("opening data: %w", err) } - if staging == nil { - return nil, xerrors.Errorf("carlog: External but staging storage not set") + // check if data needs to be replayed/truncated + dataInfo, err := dataFile.Stat() + if err != nil { + return nil, xerrors.Errorf("stat data len: %w", err) } - return &CarLog{ - staging: staging, - - head: headFile, - - IndexPath: indexPath, - DataPath: dataPath, - - eIdx: idx, + if dataInfo.Size() < h.RetiredAt { + // something is not yes - layerOffsets: h.LayerOffsets, - }, nil - } - - // open data - dataFile, err := os.OpenFile(blkLogPath, os.O_RDWR|os.O_SYNC, 0666) - if err != nil { - return nil, xerrors.Errorf("opening data: %w", err) - } - - // check if data needs to be replayed/truncated - dataInfo, err := dataFile.Stat() - if err != nil { - return nil, xerrors.Errorf("stat data len: %w", err) - } - - if dataInfo.Size() < h.RetiredAt { - // something is not yes + return nil, xerrors.Errorf("data file is shorter than head says it should be (%d < %d)", dataInfo.Size(), h.RetiredAt) + } - return nil, xerrors.Errorf("data file is shorter than head says it should be (%d < %d)", dataInfo.Size(), h.RetiredAt) + dataLen = dataInfo.Size() + } else { + dataFile = nil // it's almost definitely nil already, but just to be sure } jb := &CarLog{ @@ -326,7 +312,7 @@ func Open(staging CarStorageProvider, indexPath, dataPath string, tc TruncCleanu DataPath: dataPath, head: headFile, data: dataFile, - dataLen: dataInfo.Size(), + dataLen: dataLen, dataStart: h.DataStart, dataEnd: h.DataEnd, @@ -336,67 +322,86 @@ func Open(staging CarStorageProvider, indexPath, dataPath string, tc TruncCleanu } // open index - if h.Finalized { - // bsst, read only - idx, err := OpenBSSTIndex(filepath.Join(indexPath, BsstIndex)) + if h.External { + // External but not offloaded - we still have the data local, in need of commp + + idx, err := OpenBSSTIndex(filepath.Join(indexPath, BsstIndexCanon)) if err != nil { return nil, xerrors.Errorf("opening bsst index: %w", err) } - jb.rIdx = idx - } else { - idx, err := OpenLevelDBIndex(filepath.Join(indexPath, LevelIndex), false) - if err != nil && !os.IsNotExist(err) { - return nil, xerrors.Errorf("opening leveldb index: %w", err) + if staging == nil { + return nil, xerrors.Errorf("carlog: External but staging storage not set") } - if os.IsNotExist(err) { - log.Errorw("leveldb index missing, attempting to fix", "error", err, "path", filepath.Join(indexPath, LevelIndex)) - idx, err = OpenLevelDBIndex(filepath.Join(indexPath, LevelIndex+".temp"), true) + jb.eIdx = idx + } else { + if h.Finalized { + // bsst, read only + idx, err := OpenBSSTIndex(filepath.Join(indexPath, BsstIndex)) if err != nil { - return nil, xerrors.Errorf("creating (fixLevelIndex) leveldb index: %w", err) + return nil, xerrors.Errorf("opening bsst index: %w", err) } - err := jb.fixLevelIndex(h, idx) - if err != nil { - log.Errorw("fixing leveldb index failed", "error", err, "path", filepath.Join(indexPath, LevelIndex)) - return nil, xerrors.Errorf("fixing leveldb index: %w", err) + jb.rIdx = idx + } else { + idx, err := OpenLevelDBIndex(filepath.Join(indexPath, LevelIndex), false) + if err != nil && !os.IsNotExist(err) { + return nil, xerrors.Errorf("opening leveldb index: %w", err) } + if os.IsNotExist(err) { + log.Errorw("leveldb index missing, attempting to fix", "error", err, "path", filepath.Join(indexPath, LevelIndex)) - if err := idx.Close(); err != nil { - log.Errorw("closing temp level index failed", "error", err, "path", filepath.Join(indexPath, LevelIndex)) - return nil, xerrors.Errorf("closing temp level index: %w", err) - } + idx, err = OpenLevelDBIndex(filepath.Join(indexPath, LevelIndex+".temp"), true) + if err != nil { + return nil, xerrors.Errorf("creating (fixLevelIndex) leveldb index: %w", err) + } - if err := os.RemoveAll(filepath.Join(indexPath, LevelIndex)); err != nil { - log.Errorw("removing old level index failed", "error", err, "path", filepath.Join(indexPath, LevelIndex)) - return nil, xerrors.Errorf("removing old level index: %w", err) - } + err := jb.fixLevelIndex(h, idx) + if err != nil { + log.Errorw("fixing leveldb index failed", "error", err, "path", filepath.Join(indexPath, LevelIndex)) + return nil, xerrors.Errorf("fixing leveldb index: %w", err) + } - if err := os.Rename(filepath.Join(indexPath, LevelIndex+".temp"), filepath.Join(indexPath, LevelIndex)); err != nil { - log.Errorw("renaming temp level index failed", "error", err, "path", filepath.Join(indexPath, LevelIndex)) - return nil, xerrors.Errorf("renaming temp level index: %w", err) - } + if err := idx.Close(); err != nil { + log.Errorw("closing temp level index failed", "error", err, "path", filepath.Join(indexPath, LevelIndex)) + return nil, xerrors.Errorf("closing temp level index: %w", err) + } - idx, err = OpenLevelDBIndex(filepath.Join(indexPath, LevelIndex), false) - if err != nil { - log.Errorw("opening fixed leveldb index failed", "error", err, "path", filepath.Join(indexPath, LevelIndex)) - return nil, xerrors.Errorf("opening fixed leveldb index: %w", err) - } + if err := os.RemoveAll(filepath.Join(indexPath, LevelIndex)); err != nil { + log.Errorw("removing old level index failed", "error", err, "path", filepath.Join(indexPath, LevelIndex)) + return nil, xerrors.Errorf("removing old level index: %w", err) + } - log.Errorw("leveldb index fixed", "path", filepath.Join(indexPath, LevelIndex)) - } + if err := os.Rename(filepath.Join(indexPath, LevelIndex+".temp"), filepath.Join(indexPath, LevelIndex)); err != nil { + log.Errorw("renaming temp level index failed", "error", err, "path", filepath.Join(indexPath, LevelIndex)) + return nil, xerrors.Errorf("renaming temp level index: %w", err) + } - jb.rIdx = idx + idx, err = OpenLevelDBIndex(filepath.Join(indexPath, LevelIndex), false) + if err != nil { + log.Errorw("opening fixed leveldb index failed", "error", err, "path", filepath.Join(indexPath, LevelIndex)) + return nil, xerrors.Errorf("opening fixed leveldb index: %w", err) + } - if h.ReadOnly { - // todo start finalize - // (this should happen through group mgr) - } else { - jb.wIdx = idx + log.Errorw("leveldb index fixed", "path", filepath.Join(indexPath, LevelIndex)) + } + + jb.rIdx = idx + + if h.ReadOnly { + // todo start finalize + // (this should happen through group mgr) + } else { + jb.wIdx = idx + } } } + if noDataFile { // external, not fully offloaded, no local data file + return jb, nil + } + if h.DataEnd > 0 && len(h.LayerOffsets) == 0 { if err := dataFile.Truncate(h.DataEnd); err != nil { return nil, xerrors.Errorf("truncate data file (cut unfinished top tree): %w", err) @@ -412,25 +417,26 @@ func Open(staging CarStorageProvider, indexPath, dataPath string, tc TruncCleanu return nil, xerrors.Errorf("mutating head: %w", err) } - dataInfo, err = dataFile.Stat() + dataInfo, err := dataFile.Stat() if err != nil { return nil, xerrors.Errorf("stat top-trunc data len: %w", err) } + dataLen = dataInfo.Size() } // todo right now we're calling this on every startup when writable // a better way to do that would be to have a track that we're in a "clean" // state, or even to have two indexes, one for clean and one for dirty data - if dataInfo.Size() > h.RetiredAt || jb.wIdx != nil { // data ahead means there was an unclean shutdown during a write + if dataLen > h.RetiredAt || jb.wIdx != nil { // data ahead means there was an unclean shutdown during a write if h.ReadOnly { // If the file is truncated while being marked as read-only, something is terribly wrong. - return nil, xerrors.Errorf("read only(!) data file is shorter than head says it should be (%d < %d)", dataInfo.Size(), h.RetiredAt) + return nil, xerrors.Errorf("read only(!) data file is shorter than head says it should be (%d < %d)", dataLen, h.RetiredAt) } //return nil, xerrors.Errorf("data file is shorter than head says it should be (%d < %d)", dataInfo.Size(), h.RetiredAt) // truncate index - err := jb.truncate(h.RetiredAt, dataInfo.Size(), tc) + err := jb.truncate(h.RetiredAt, dataLen, tc) if err != nil { return nil, xerrors.Errorf("truncating jbob: %w", err) } @@ -1544,7 +1550,7 @@ func (j *CarLog) WriteCar(w io.Writer) (int64, cid.Cid, error) { // read root block, which is the only block in the last layer rcid, node, err := carutil.ReadNode(layers[len(layers)-1].br) if err != nil { - return 0, cid.Undef, xerrors.Errorf("reading root block: %w", err) + return 0, cid.Undef, xerrors.Errorf("reading root block (lo: %#v; rsat: %d, rsbase: %d; data: %v): %w", j.layerOffsets, layers[len(layers)-1].rs.(*readSeekerFromReaderAt).pos, layers[len(layers)-1].rs.(*readSeekerFromReaderAt).base, j.data, err) } // todo consider buffering the writes @@ -1927,6 +1933,10 @@ type readSeekerFromReaderAt struct { func (rs *readSeekerFromReaderAt) Read(p []byte) (n int, err error) { n, err = rs.readerAt.ReadAt(p, rs.pos+rs.base) rs.pos += int64(n) + if err != nil && err != io.EOF { + log.Errorw("READ SEEKER AT ERROR", "err", err, "plen", len(p), "pos", rs.pos, "base", rs.base, "n", n) + debug.PrintStack() + } return n, err } @@ -1939,6 +1949,8 @@ func (rs *readSeekerFromReaderAt) Seek(offset int64, whence int) (int64, error) case io.SeekEnd: return 0, io.ErrUnexpectedEOF default: + log.Errorw("READ SEEKER AT INVALID WHENCE", "whence", whence) + debug.PrintStack() return 0, os.ErrInvalid }