Skip to content

Commit

Permalink
Merge pull request etcd-io#5325 from heyitsanthony/fix-partial-wal-init
Browse files Browse the repository at this point in the history
wal: atomically initialize wal directory
  • Loading branch information
Anthony Romano committed May 12, 2016
2 parents 6fc3106 + 1739133 commit 9acb7ab
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 4 deletions.
21 changes: 18 additions & 3 deletions wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,18 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
return nil, os.ErrExist
}

if err := os.MkdirAll(dirpath, privateDirMode); err != nil {
// keep temporary wal directory so WAL initialization appears atomic
tmpdirpath := path.Clean(dirpath) + ".tmp"
if fileutil.Exist(tmpdirpath) {
if err := os.RemoveAll(tmpdirpath); err != nil {
return nil, err
}
}
if err := os.MkdirAll(tmpdirpath, privateDirMode); err != nil {
return nil, err
}

p := path.Join(dirpath, walName(0, 0))
p := path.Join(tmpdirpath, walName(0, 0))
f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return nil, err
Expand All @@ -109,7 +116,6 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
dir: dirpath,
metadata: metadata,
encoder: newEncoder(f, 0),
fp: newFilePipeline(dirpath, segmentSizeBytes),
}
w.locks = append(w.locks, f)
if err := w.saveCrc(0); err != nil {
Expand All @@ -121,6 +127,15 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
if err := w.SaveSnapshot(walpb.Snapshot{}); err != nil {
return nil, err
}

if err := os.RemoveAll(dirpath); err != nil {
return nil, err
}
if err := os.Rename(tmpdirpath, dirpath); err != nil {
return nil, err
}

w.fp = newFilePipeline(w.dir, segmentSizeBytes)
return w, nil
}

Expand Down
38 changes: 37 additions & 1 deletion wal/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestNew(t *testing.T) {
t.Fatal(err)
}
gd := make([]byte, off)
f, err := os.Open(w.tail().Name())
f, err := os.Open(path.Join(p, path.Base(w.tail().Name())))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -600,3 +600,39 @@ func TestTailWriteNoSlackSpace(t *testing.T) {
}
w.Close()
}

// TestRestartCreateWal ensures that an interrupted WAL initialization is clobbered on restart
func TestRestartCreateWal(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(p)

// make temporary directory so it looks like initialization is interrupted
tmpdir := path.Clean(p) + ".tmp"
if err = os.Mkdir(p+".tmp", 0755); err != nil {
t.Fatal(err)
}
if _, err = os.OpenFile(path.Join(tmpdir, "test"), os.O_WRONLY|os.O_CREATE, 0600); err != nil {
t.Fatal(err)
}

w, werr := Create(p, []byte("abc"))
if werr != nil {
t.Fatal(werr)
}
w.Close()
if Exist(tmpdir) {
t.Fatalf("got %q exists, expected it to not exist", tmpdir)
}

if w, err = OpenForRead(p, walpb.Snapshot{}); err != nil {
t.Fatal(err)
}
defer w.Close()

if meta, _, _, rerr := w.ReadAll(); rerr != nil || string(meta) != "abc" {
t.Fatalf("got error %v and meta %q, expected nil and %q", rerr, meta, "abc")
}
}

0 comments on commit 9acb7ab

Please sign in to comment.