Skip to content

Commit

Permalink
etcdserver: process the scenaro of the last WAL record being partiall…
Browse files Browse the repository at this point in the history
…y synced to disk

We need to return io.ErrUnexpectedEOF in the error chain, so that
etcdserver can repair it automatically.

Backport #15068

Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr committed Jan 7, 2023
1 parent 9e3966f commit e1fc545
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 12 deletions.
3 changes: 2 additions & 1 deletion server/etcdserver/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package etcdserver

import (
"errors"
"io"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
Expand Down Expand Up @@ -100,7 +101,7 @@ func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot, unsafeNoFsync b
if wmetadata, st, ents, err = w.ReadAll(); err != nil {
w.Close()
// we can only repair ErrUnexpectedEOF and we never repair twice.
if repaired || err != io.ErrUnexpectedEOF {
if repaired || !errors.Is(err, io.ErrUnexpectedEOF) {
lg.Fatal("failed to read WAL, cannot be repaired", zap.Error(err))
}
if !wal.Repair(lg, waldir) {
Expand Down
4 changes: 2 additions & 2 deletions server/wal/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func (d *decoder) decodeRecord(rec *walpb.Record) error {
// The length of current WAL entry must be less than the remaining file size.
maxEntryLimit := fileBufReader.FileInfo().Size() - d.lastValidOff - padBytes
if recBytes > maxEntryLimit {
return fmt.Errorf("wal: max entry size limit exceeded, recBytes: %d, fileSize(%d) - offset(%d) - padBytes(%d) = entryLimit(%d)",
recBytes, fileBufReader.FileInfo().Size(), d.lastValidOff, padBytes, maxEntryLimit)
return fmt.Errorf("%w: [wal] max entry size limit exceeded when decoding %q, recBytes: %d, fileSize(%d) - offset(%d) - padBytes(%d) = entryLimit(%d)",
io.ErrUnexpectedEOF, fileBufReader.FileInfo().Name(), recBytes, fileBufReader.FileInfo().Size(), d.lastValidOff, padBytes, maxEntryLimit)
}

data := make([]byte, recBytes+padBytes)
Expand Down
9 changes: 5 additions & 4 deletions server/wal/repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package wal

import (
"errors"
"io"
"os"
"path/filepath"
Expand Down Expand Up @@ -44,8 +45,8 @@ func Repair(lg *zap.Logger, dirpath string) bool {
for {
lastOffset := decoder.lastOffset()
err := decoder.decode(rec)
switch err {
case nil:
switch {
case err == nil:
// update crc of the decoder when necessary
switch rec.Type {
case crcType:
Expand All @@ -59,11 +60,11 @@ func Repair(lg *zap.Logger, dirpath string) bool {
}
continue

case io.EOF:
case errors.Is(err, io.EOF):
lg.Info("repaired", zap.String("path", f.Name()), zap.Error(io.EOF))
return true

case io.ErrUnexpectedEOF:
case errors.Is(err, io.ErrUnexpectedEOF):
bf, bferr := os.Create(f.Name() + ".broken")
if bferr != nil {
lg.Warn("failed to create backup file", zap.String("path", f.Name()+".broken"), zap.Error(bferr))
Expand Down
10 changes: 5 additions & 5 deletions server/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,13 +500,13 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
// We do not have to read out all entries in read mode.
// The last record maybe a partial written one, so
// ErrunexpectedEOF might be returned.
if err != io.EOF && err != io.ErrUnexpectedEOF {
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
state.Reset()
return nil, state, nil, err
}
default:
// We must read all of the entries if WAL is opened in write mode.
if err != io.EOF {
// We must read all the entries if WAL is opened in write mode.
if !errors.Is(err, io.EOF) {
state.Reset()
return nil, state, nil, err
}
Expand Down Expand Up @@ -598,7 +598,7 @@ func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, erro
}
// We do not have to read out all the WAL entries
// as the decoder is opened in read mode.
if err != io.EOF && err != io.ErrUnexpectedEOF {
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
return nil, err
}

Expand Down Expand Up @@ -688,7 +688,7 @@ func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) (*raftpb.HardSta

// We do not have to read out all the WAL entries
// as the decoder is opened in read mode.
if err != io.EOF && err != io.ErrUnexpectedEOF {
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
return nil, err
}

Expand Down
77 changes: 77 additions & 0 deletions server/wal/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package wal
import (
"bytes"
"fmt"
"github.com/stretchr/testify/require"
"io"
"io/ioutil"
"math"
Expand Down Expand Up @@ -1155,3 +1156,79 @@ func TestValidSnapshotEntriesAfterPurgeWal(t *testing.T) {
t.Fatal(err)
}
}

func TestLastRecordLengthExceedFileEnd(t *testing.T) {
/* The data below was generated by code something like below. The length
* of the last record was intentionally changed to 1000 in order to make
* sure it exceeds the end of the file.
*
* for i := 0; i < 3; i++ {
* es := []raftpb.Entry{{Index: uint64(i + 1), Data: []byte(fmt.Sprintf("waldata%d", i+1))}}
* if err = w.Save(raftpb.HardState{}, es); err != nil {
* t.Fatal(err)
* }
* }
* ......
* var sb strings.Builder
* for _, ch := range buf {
* sb.WriteString(fmt.Sprintf("\\x%02x", ch))
* }
*/
// Generate WAL file
t.Log("Generate a WAL file with the last record's length modified.")
data := []byte("\x04\x00\x00\x00\x00\x00\x00\x84\x08\x04\x10\x00\x00" +
"\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x84\x08\x01\x10\x00\x00" +
"\x00\x00\x00\x0e\x00\x00\x00\x00\x00\x00\x82\x08\x05\x10\xa0\xb3" +
"\x9b\x8f\x08\x1a\x04\x08\x00\x10\x00\x00\x00\x1a\x00\x00\x00\x00" +
"\x00\x00\x86\x08\x02\x10\xba\x8b\xdc\x85\x0f\x1a\x10\x08\x00\x10" +
"\x00\x18\x01\x22\x08\x77\x61\x6c\x64\x61\x74\x61\x31\x00\x00\x00" +
"\x00\x00\x00\x1a\x00\x00\x00\x00\x00\x00\x86\x08\x02\x10\xa1\xe8" +
"\xff\x9c\x02\x1a\x10\x08\x00\x10\x00\x18\x02\x22\x08\x77\x61\x6c" +
"\x64\x61\x74\x61\x32\x00\x00\x00\x00\x00\x00\xe8\x03\x00\x00\x00" +
"\x00\x00\x86\x08\x02\x10\xa1\x9c\xa1\xaa\x04\x1a\x10\x08\x00\x10" +
"\x00\x18\x03\x22\x08\x77\x61\x6c\x64\x61\x74\x61\x33\x00\x00\x00" +
"\x00\x00\x00")

buf := bytes.NewBuffer(data)
f, err := createFileWithData(t, buf)
fileName := f.Name()
require.NoError(t, err)
t.Logf("fileName: %v", fileName)

// Verify low-level decoder directly
t.Log("Verify all records can be parsed correctly.")
rec := &walpb.Record{}
decoder := newDecoder(fileutil.NewFileReader(f))
for {
if err = decoder.decode(rec); err != nil {
require.ErrorIs(t, err, io.ErrUnexpectedEOF)
break
}
if rec.Type == entryType {
e := mustUnmarshalEntry(rec.Data)
t.Logf("Validating normal entry: %v", e)
recData := fmt.Sprintf("waldata%d", e.Index)
require.Equal(t, raftpb.EntryNormal, e.Type)
require.Equal(t, recData, string(e.Data))
}
rec = &walpb.Record{}
}
require.NoError(t, f.Close())

// Verify w.ReadAll() returns io.ErrUnexpectedEOF in the error chain.
t.Log("Verify the w.ReadAll returns io.ErrUnexpectedEOF in the error chain")
newFileName := filepath.Join(filepath.Dir(fileName), "0000000000000000-0000000000000000.wal")
require.NoError(t, os.Rename(fileName, newFileName))

w, err := Open(zaptest.NewLogger(t), filepath.Dir(fileName), walpb.Snapshot{
Index: 0,
Term: 0,
})
require.NoError(t, err)
defer w.Close()

_, _, _, err = w.ReadAll()
// Note: The wal file will be repaired automatically in production
// environment, but only once.
require.ErrorIs(t, err, io.ErrUnexpectedEOF)
}

0 comments on commit e1fc545

Please sign in to comment.