From e1fc545d8a2e189f68e5bd9090a1949b3ea703d7 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Sun, 8 Jan 2023 05:30:01 +0800 Subject: [PATCH] etcdserver: process the scenaro of the last WAL record being partially synced to disk We need to return io.ErrUnexpectedEOF in the error chain, so that etcdserver can repair it automatically. Backport https://github.com/etcd-io/etcd/pull/15068 Signed-off-by: Benjamin Wang --- server/etcdserver/storage.go | 3 +- server/wal/decoder.go | 4 +- server/wal/repair.go | 9 +++-- server/wal/wal.go | 10 ++--- server/wal/wal_test.go | 77 ++++++++++++++++++++++++++++++++++++ 5 files changed, 91 insertions(+), 12 deletions(-) diff --git a/server/etcdserver/storage.go b/server/etcdserver/storage.go index e662537d368..64fc862c0c2 100644 --- a/server/etcdserver/storage.go +++ b/server/etcdserver/storage.go @@ -15,6 +15,7 @@ package etcdserver import ( + "errors" "io" pb "go.etcd.io/etcd/api/v3/etcdserverpb" @@ -100,7 +101,7 @@ func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot, unsafeNoFsync b if wmetadata, st, ents, err = w.ReadAll(); err != nil { w.Close() // we can only repair ErrUnexpectedEOF and we never repair twice. - if repaired || err != io.ErrUnexpectedEOF { + if repaired || !errors.Is(err, io.ErrUnexpectedEOF) { lg.Fatal("failed to read WAL, cannot be repaired", zap.Error(err)) } if !wal.Repair(lg, waldir) { diff --git a/server/wal/decoder.go b/server/wal/decoder.go index 2656d286ac2..b8c68bef6d4 100644 --- a/server/wal/decoder.go +++ b/server/wal/decoder.go @@ -84,8 +84,8 @@ func (d *decoder) decodeRecord(rec *walpb.Record) error { // The length of current WAL entry must be less than the remaining file size. maxEntryLimit := fileBufReader.FileInfo().Size() - d.lastValidOff - padBytes if recBytes > maxEntryLimit { - return fmt.Errorf("wal: max entry size limit exceeded, recBytes: %d, fileSize(%d) - offset(%d) - padBytes(%d) = entryLimit(%d)", - recBytes, fileBufReader.FileInfo().Size(), d.lastValidOff, padBytes, maxEntryLimit) + return fmt.Errorf("%w: [wal] max entry size limit exceeded when decoding %q, recBytes: %d, fileSize(%d) - offset(%d) - padBytes(%d) = entryLimit(%d)", + io.ErrUnexpectedEOF, fileBufReader.FileInfo().Name(), recBytes, fileBufReader.FileInfo().Size(), d.lastValidOff, padBytes, maxEntryLimit) } data := make([]byte, recBytes+padBytes) diff --git a/server/wal/repair.go b/server/wal/repair.go index 0ed8425463a..dec2fa240b6 100644 --- a/server/wal/repair.go +++ b/server/wal/repair.go @@ -15,6 +15,7 @@ package wal import ( + "errors" "io" "os" "path/filepath" @@ -44,8 +45,8 @@ func Repair(lg *zap.Logger, dirpath string) bool { for { lastOffset := decoder.lastOffset() err := decoder.decode(rec) - switch err { - case nil: + switch { + case err == nil: // update crc of the decoder when necessary switch rec.Type { case crcType: @@ -59,11 +60,11 @@ func Repair(lg *zap.Logger, dirpath string) bool { } continue - case io.EOF: + case errors.Is(err, io.EOF): lg.Info("repaired", zap.String("path", f.Name()), zap.Error(io.EOF)) return true - case io.ErrUnexpectedEOF: + case errors.Is(err, io.ErrUnexpectedEOF): bf, bferr := os.Create(f.Name() + ".broken") if bferr != nil { lg.Warn("failed to create backup file", zap.String("path", f.Name()+".broken"), zap.Error(bferr)) diff --git a/server/wal/wal.go b/server/wal/wal.go index 01d0c28d6ba..2907e52c681 100644 --- a/server/wal/wal.go +++ b/server/wal/wal.go @@ -500,13 +500,13 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. // We do not have to read out all entries in read mode. // The last record maybe a partial written one, so // ErrunexpectedEOF might be returned. - if err != io.EOF && err != io.ErrUnexpectedEOF { + if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) { state.Reset() return nil, state, nil, err } default: - // We must read all of the entries if WAL is opened in write mode. - if err != io.EOF { + // We must read all the entries if WAL is opened in write mode. + if !errors.Is(err, io.EOF) { state.Reset() return nil, state, nil, err } @@ -598,7 +598,7 @@ func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, erro } // We do not have to read out all the WAL entries // as the decoder is opened in read mode. - if err != io.EOF && err != io.ErrUnexpectedEOF { + if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) { return nil, err } @@ -688,7 +688,7 @@ func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) (*raftpb.HardSta // We do not have to read out all the WAL entries // as the decoder is opened in read mode. - if err != io.EOF && err != io.ErrUnexpectedEOF { + if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) { return nil, err } diff --git a/server/wal/wal_test.go b/server/wal/wal_test.go index 879f76c2a16..2ea6f2a621c 100644 --- a/server/wal/wal_test.go +++ b/server/wal/wal_test.go @@ -17,6 +17,7 @@ package wal import ( "bytes" "fmt" + "github.com/stretchr/testify/require" "io" "io/ioutil" "math" @@ -1155,3 +1156,79 @@ func TestValidSnapshotEntriesAfterPurgeWal(t *testing.T) { t.Fatal(err) } } + +func TestLastRecordLengthExceedFileEnd(t *testing.T) { + /* The data below was generated by code something like below. The length + * of the last record was intentionally changed to 1000 in order to make + * sure it exceeds the end of the file. + * + * for i := 0; i < 3; i++ { + * es := []raftpb.Entry{{Index: uint64(i + 1), Data: []byte(fmt.Sprintf("waldata%d", i+1))}} + * if err = w.Save(raftpb.HardState{}, es); err != nil { + * t.Fatal(err) + * } + * } + * ...... + * var sb strings.Builder + * for _, ch := range buf { + * sb.WriteString(fmt.Sprintf("\\x%02x", ch)) + * } + */ + // Generate WAL file + t.Log("Generate a WAL file with the last record's length modified.") + data := []byte("\x04\x00\x00\x00\x00\x00\x00\x84\x08\x04\x10\x00\x00" + + "\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x84\x08\x01\x10\x00\x00" + + "\x00\x00\x00\x0e\x00\x00\x00\x00\x00\x00\x82\x08\x05\x10\xa0\xb3" + + "\x9b\x8f\x08\x1a\x04\x08\x00\x10\x00\x00\x00\x1a\x00\x00\x00\x00" + + "\x00\x00\x86\x08\x02\x10\xba\x8b\xdc\x85\x0f\x1a\x10\x08\x00\x10" + + "\x00\x18\x01\x22\x08\x77\x61\x6c\x64\x61\x74\x61\x31\x00\x00\x00" + + "\x00\x00\x00\x1a\x00\x00\x00\x00\x00\x00\x86\x08\x02\x10\xa1\xe8" + + "\xff\x9c\x02\x1a\x10\x08\x00\x10\x00\x18\x02\x22\x08\x77\x61\x6c" + + "\x64\x61\x74\x61\x32\x00\x00\x00\x00\x00\x00\xe8\x03\x00\x00\x00" + + "\x00\x00\x86\x08\x02\x10\xa1\x9c\xa1\xaa\x04\x1a\x10\x08\x00\x10" + + "\x00\x18\x03\x22\x08\x77\x61\x6c\x64\x61\x74\x61\x33\x00\x00\x00" + + "\x00\x00\x00") + + buf := bytes.NewBuffer(data) + f, err := createFileWithData(t, buf) + fileName := f.Name() + require.NoError(t, err) + t.Logf("fileName: %v", fileName) + + // Verify low-level decoder directly + t.Log("Verify all records can be parsed correctly.") + rec := &walpb.Record{} + decoder := newDecoder(fileutil.NewFileReader(f)) + for { + if err = decoder.decode(rec); err != nil { + require.ErrorIs(t, err, io.ErrUnexpectedEOF) + break + } + if rec.Type == entryType { + e := mustUnmarshalEntry(rec.Data) + t.Logf("Validating normal entry: %v", e) + recData := fmt.Sprintf("waldata%d", e.Index) + require.Equal(t, raftpb.EntryNormal, e.Type) + require.Equal(t, recData, string(e.Data)) + } + rec = &walpb.Record{} + } + require.NoError(t, f.Close()) + + // Verify w.ReadAll() returns io.ErrUnexpectedEOF in the error chain. + t.Log("Verify the w.ReadAll returns io.ErrUnexpectedEOF in the error chain") + newFileName := filepath.Join(filepath.Dir(fileName), "0000000000000000-0000000000000000.wal") + require.NoError(t, os.Rename(fileName, newFileName)) + + w, err := Open(zaptest.NewLogger(t), filepath.Dir(fileName), walpb.Snapshot{ + Index: 0, + Term: 0, + }) + require.NoError(t, err) + defer w.Close() + + _, _, _, err = w.ReadAll() + // Note: The wal file will be repaired automatically in production + // environment, but only once. + require.ErrorIs(t, err, io.ErrUnexpectedEOF) +}