Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
Merge pull request #340 from prometheus/wal_migrate
Browse files Browse the repository at this point in the history
Migrate write ahead log
  • Loading branch information
fabxc authored Aug 2, 2018
2 parents a9a8fab + ee7ee05 commit 7699051
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 1 deletion.
4 changes: 4 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
if err := repairBadIndexVersion(l, dir); err != nil {
return nil, err
}
// Migrate old WAL.
if err := MigrateWAL(l, filepath.Join(dir, "wal")); err != nil {
return nil, errors.Wrap(err, "migrate WAL")
}

db = &DB{
dir: dir,
Expand Down
2 changes: 2 additions & 0 deletions head.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,8 @@ func (h *Head) Init() error {
if err == nil {
return nil
}
level.Warn(h.logger).Log("msg", "encountered WAL error, attempting repair", "err", err)

if err := h.wal.Repair(err); err != nil {
return errors.Wrap(err, "repair corrupted WAL")
}
Expand Down
2 changes: 1 addition & 1 deletion repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestRepairBadIndexVersion(t *testing.T) {
}

// On DB opening all blocks in the base dir should be repaired.
db, _ := Open("testdata/repair_index_version", nil, nil, nil)
db, err := Open("testdata/repair_index_version", nil, nil, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
102 changes: 102 additions & 0 deletions wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/wal"
)

// WALEntryType indicates what data a WAL entry contains.
Expand Down Expand Up @@ -82,6 +83,8 @@ func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics {

// WAL is a write ahead log that can log new series labels and samples.
// It must be completely read before new entries are logged.
//
// DEPRECATED: use wal pkg combined with the record codex instead.
type WAL interface {
Reader() WALReader
LogSeries([]RefSeries) error
Expand Down Expand Up @@ -173,6 +176,8 @@ func newCRC32() hash.Hash32 {
}

// SegmentWAL is a write ahead log for series data.
//
// DEPRECATED: use wal pkg combined with the record coders instead.
type SegmentWAL struct {
mtx sync.Mutex
metrics *walMetrics
Expand Down Expand Up @@ -1206,3 +1211,100 @@ func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error {
}
return nil
}

// MigrateWAL rewrites the deprecated write ahead log into the new format.
func MigrateWAL(logger log.Logger, dir string) (err error) {
if logger == nil {
logger = log.NewNopLogger()
}
// Detect whether we still have the old WAL.
fns, err := sequenceFiles(dir)
if err != nil && !os.IsNotExist(err) {
return errors.Wrap(err, "list sequence files")
}
if len(fns) == 0 {
return nil // No WAL at all yet.
}
// Check header of first segment to see whether we are still dealing with an
// old WAL.
f, err := os.Open(fns[0])
if err != nil {
return errors.Wrap(err, "check first existing segment")
}
defer f.Close()

var hdr [4]byte
if _, err := f.Read(hdr[:]); err != nil && err != io.EOF {
return errors.Wrap(err, "read header from first segment")
}
// If we cannot read the magic header for segments of the old WAL, abort.
// Either it's migrated already or there's a corruption issue with which
// we cannot deal here anyway. Subsequent attempts to open the WAL will error in that case.
if binary.BigEndian.Uint32(hdr[:]) != WALMagic {
return nil
}

level.Info(logger).Log("msg", "migrating WAL format")

tmpdir := dir + ".tmp"
if err := os.RemoveAll(tmpdir); err != nil {
return errors.Wrap(err, "cleanup replacement dir")
}
repl, err := wal.New(logger, nil, tmpdir)
if err != nil {
return errors.Wrap(err, "open new WAL")
}
// It should've already been closed as part of the previous finalization.
// Do it once again in case of prior errors.
defer func() {
if err != nil {
repl.Close()
}
}()

w, err := OpenSegmentWAL(dir, logger, time.Minute, nil)
if err != nil {
return errors.Wrap(err, "open old WAL")
}
defer w.Close()

rdr := w.Reader()

var (
enc RecordEncoder
b []byte
)
decErr := rdr.Read(
func(s []RefSeries) {
if err != nil {
return
}
err = repl.Log(enc.Series(s, b[:0]))
},
func(s []RefSample) {
if err != nil {
return
}
err = repl.Log(enc.Samples(s, b[:0]))
},
func(s []Stone) {
if err != nil {
return
}
err = repl.Log(enc.Tombstones(s, b[:0]))
},
)
if decErr != nil {
return errors.Wrap(err, "decode old entries")
}
if err != nil {
return errors.Wrap(err, "write new entries")
}
if err := repl.Close(); err != nil {
return errors.Wrap(err, "close new WAL")
}
if err := fileutil.Rename(tmpdir, dir); err != nil {
return errors.Wrap(err, "replace old WAL")
}
return nil
}
116 changes: 116 additions & 0 deletions wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ import (
"io/ioutil"
"math/rand"
"os"
"path"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
"github.com/prometheus/tsdb/wal"
)

func TestSegmentWAL_cut(t *testing.T) {
Expand Down Expand Up @@ -431,3 +433,117 @@ func TestWALRestoreCorrupted(t *testing.T) {
})
}
}

func TestMigrateWAL_Empty(t *testing.T) {
// The migration proecedure must properly deal with a zero-length segment,
// which is valid in the new format.
dir, err := ioutil.TempDir("", "walmigrate")
testutil.Ok(t, err)
defer os.RemoveAll(dir)

wdir := path.Join(dir, "wal")

// Initialize empty WAL.
w, err := wal.New(nil, nil, wdir)
testutil.Ok(t, err)
testutil.Ok(t, w.Close())

testutil.Ok(t, MigrateWAL(nil, wdir))
}

func TestMigrateWAL_Fuzz(t *testing.T) {
dir, err := ioutil.TempDir("", "walmigrate")
testutil.Ok(t, err)
defer os.RemoveAll(dir)

wdir := path.Join(dir, "wal")

// Should pass if no WAL exists yet.
testutil.Ok(t, MigrateWAL(nil, wdir))

oldWAL, err := OpenSegmentWAL(wdir, nil, time.Minute, nil)
testutil.Ok(t, err)

// Write some data.
testutil.Ok(t, oldWAL.LogSeries([]RefSeries{
{Ref: 100, Labels: labels.FromStrings("abc", "def", "123", "456")},
{Ref: 1, Labels: labels.FromStrings("abc", "def2", "1234", "4567")},
}))
testutil.Ok(t, oldWAL.LogSamples([]RefSample{
{Ref: 1, T: 100, V: 200},
{Ref: 2, T: 300, V: 400},
}))
testutil.Ok(t, oldWAL.LogSeries([]RefSeries{
{Ref: 200, Labels: labels.FromStrings("xyz", "def", "foo", "bar")},
}))
testutil.Ok(t, oldWAL.LogSamples([]RefSample{
{Ref: 3, T: 100, V: 200},
{Ref: 4, T: 300, V: 400},
}))
testutil.Ok(t, oldWAL.LogDeletes([]Stone{
{ref: 1, intervals: []Interval{{100, 200}}},
}))

testutil.Ok(t, oldWAL.Close())

// Perform migration.
testutil.Ok(t, MigrateWAL(nil, wdir))

w, err := wal.New(nil, nil, wdir)
testutil.Ok(t, err)

// We can properly write some new data after migration.
var enc RecordEncoder
testutil.Ok(t, w.Log(enc.Samples([]RefSample{
{Ref: 500, T: 1, V: 1},
}, nil)))

testutil.Ok(t, w.Close())

// Read back all data.
sr, err := wal.NewSegmentsReader(wdir)
testutil.Ok(t, err)

r := wal.NewReader(sr)
var res []interface{}
var dec RecordDecoder

for r.Next() {
rec := r.Record()

switch dec.Type(rec) {
case RecordSeries:
s, err := dec.Series(rec, nil)
testutil.Ok(t, err)
res = append(res, s)
case RecordSamples:
s, err := dec.Samples(rec, nil)
testutil.Ok(t, err)
res = append(res, s)
case RecordTombstones:
s, err := dec.Tombstones(rec, nil)
testutil.Ok(t, err)
res = append(res, s)
default:
t.Fatalf("unknown record type %d", dec.Type(rec))
}
}
testutil.Ok(t, r.Err())

testutil.Equals(t, []interface{}{
[]RefSeries{
{Ref: 100, Labels: labels.FromStrings("abc", "def", "123", "456")},
{Ref: 1, Labels: labels.FromStrings("abc", "def2", "1234", "4567")},
},
[]RefSample{{Ref: 1, T: 100, V: 200}, {Ref: 2, T: 300, V: 400}},
[]RefSeries{
{Ref: 200, Labels: labels.FromStrings("xyz", "def", "foo", "bar")},
},
[]RefSample{{Ref: 3, T: 100, V: 200}, {Ref: 4, T: 300, V: 400}},
[]Stone{{ref: 1, intervals: []Interval{{100, 200}}}},
[]RefSample{{Ref: 500, T: 1, V: 1}},
}, res)

// Migrating an already migrated WAL shouldn't do anything.
testutil.Ok(t, MigrateWAL(nil, wdir))
}

0 comments on commit 7699051

Please sign in to comment.