diff --git a/checkpoint.go b/checkpoint.go index 87ff5597..d988d356 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -37,7 +37,7 @@ type CheckpointStats struct { DroppedTombstones int TotalSeries int // Processed series including dropped ones. TotalSamples int // Processed samples inlcuding dropped ones. - TotalTombstones int // Processed tombstones including droppes ones. + TotalTombstones int // Processed tombstones including dropped ones. } // LastCheckpoint returns the directory name of the most recent checkpoint. @@ -260,8 +260,8 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo if err := cp.Close(); err != nil { return nil, errors.Wrap(err, "close checkpoint") } - if err := fileutil.Rename(cpdirtmp, cpdir); err != nil { - return nil, errors.Wrap(err, "rename checkpoint file") + if err := fileutil.Replace(cpdirtmp, cpdir); err != nil { + return nil, errors.Wrap(err, "rename checkpoint directory") } if err := w.Truncate(n + 1); err != nil { // If truncating fails, we'll just try again at the next checkpoint. diff --git a/fileutil/fileutil.go b/fileutil/fileutil.go index a3eb2a7a..2158bfd2 100644 --- a/fileutil/fileutil.go +++ b/fileutil/fileutil.go @@ -43,3 +43,26 @@ func Rename(from, to string) error { } return pdir.Close() } + +// Replace moves a file or directory to a new location and deletes any previous data. +// It is not atomic. +func Replace(from, to string) error { + if err := os.RemoveAll(to); err != nil { + return nil + } + if err := os.Rename(from, to); err != nil { + return err + } + + // Directory was renamed; sync parent dir to persist rename. + pdir, err := OpenDir(filepath.Dir(to)) + if err != nil { + return err + } + + if err = Fsync(pdir); err != nil { + pdir.Close() + return err + } + return pdir.Close() +} diff --git a/wal/wal.go b/wal/wal.go index c2b333da..e59b0e15 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -393,11 +393,11 @@ func (w *WAL) flushPage(clear bool) error { type recType uint8 const ( - recPageTerm recType = 0 // rest of page is empty - recFull recType = 1 // full record - recFirst recType = 2 // first fragment of a record - recMiddle recType = 3 // middle fragments of a record - recLast recType = 4 // final fragment of a record + recPageTerm recType = 0 // Rest of page is empty. + recFull recType = 1 // Full record. + recFirst recType = 2 // First fragment of a record. + recMiddle recType = 3 // Middle fragments of a record. + recLast recType = 4 // Final fragment of a record. ) func (t recType) String() string { @@ -442,8 +442,8 @@ func (w *WAL) log(rec []byte, final bool) error { // If the record is too big to fit within pages in the current // segment, terminate the active segment and advance to the next one. // This ensures that records do not cross segment boundaries. - left := w.page.remaining() - recordHeaderSize // active page - left += (pageSize - recordHeaderSize) * (w.pagesPerSegment() - w.donePages - 1) // free pages + left := w.page.remaining() - recordHeaderSize // Active pages. + left += (pageSize - recordHeaderSize) * (w.pagesPerSegment() - w.donePages - 1) // Free pages. if len(rec) > left { if err := w.nextSegment(); err != nil { @@ -716,7 +716,7 @@ func (r *Reader) next() (err error) { // It's not strictly necessary but may catch sketchy state early. k := pageSize - (r.total % pageSize) if k == pageSize { - continue // initial 0 byte was last page byte + continue // Initial 0 byte was last page byte. } n, err := io.ReadFull(r.rdr, buf[:k]) if err != nil {