diff --git a/wal/wal.go b/wal/wal.go index 1f55173f0a0..77fefb7bb43 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -89,11 +89,18 @@ func Create(dirpath string, metadata []byte) (*WAL, error) { return nil, os.ErrExist } - if err := os.MkdirAll(dirpath, privateDirMode); err != nil { + // keep temporary wal directory so WAL initialization appears atomic + tmpdirpath := path.Clean(dirpath) + ".tmp" + if fileutil.Exist(tmpdirpath) { + if err := os.RemoveAll(tmpdirpath); err != nil { + return nil, err + } + } + if err := os.MkdirAll(tmpdirpath, privateDirMode); err != nil { return nil, err } - p := path.Join(dirpath, walName(0, 0)) + p := path.Join(tmpdirpath, walName(0, 0)) f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, 0600) if err != nil { return nil, err @@ -109,7 +116,6 @@ func Create(dirpath string, metadata []byte) (*WAL, error) { dir: dirpath, metadata: metadata, encoder: newEncoder(f, 0), - fp: newFilePipeline(dirpath, segmentSizeBytes), } w.locks = append(w.locks, f) if err := w.saveCrc(0); err != nil { @@ -121,6 +127,15 @@ func Create(dirpath string, metadata []byte) (*WAL, error) { if err := w.SaveSnapshot(walpb.Snapshot{}); err != nil { return nil, err } + + if err := os.RemoveAll(dirpath); err != nil { + return nil, err + } + if err := os.Rename(tmpdirpath, dirpath); err != nil { + return nil, err + } + + w.fp = newFilePipeline(w.dir, segmentSizeBytes) return w, nil } diff --git a/wal/wal_test.go b/wal/wal_test.go index 9045c557d90..49fbddefe46 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -50,7 +50,7 @@ func TestNew(t *testing.T) { t.Fatal(err) } gd := make([]byte, off) - f, err := os.Open(w.tail().Name()) + f, err := os.Open(path.Join(p, path.Base(w.tail().Name()))) if err != nil { t.Fatal(err) } @@ -600,3 +600,39 @@ func TestTailWriteNoSlackSpace(t *testing.T) { } w.Close() } + +// TestRestartCreateWal ensures that an interrupted WAL initialization is clobbered on restart +func TestRestartCreateWal(t *testing.T) { + p, err := ioutil.TempDir(os.TempDir(), "waltest") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(p) + + // make temporary directory so it looks like initialization is interrupted + tmpdir := path.Clean(p) + ".tmp" + if err = os.Mkdir(p+".tmp", 0755); err != nil { + t.Fatal(err) + } + if _, err = os.OpenFile(path.Join(tmpdir, "test"), os.O_WRONLY|os.O_CREATE, 0600); err != nil { + t.Fatal(err) + } + + w, werr := Create(p, []byte("abc")) + if werr != nil { + t.Fatal(werr) + } + w.Close() + if Exist(tmpdir) { + t.Fatalf("got %q exists, expected it to not exist", tmpdir) + } + + if w, err = OpenForRead(p, walpb.Snapshot{}); err != nil { + t.Fatal(err) + } + defer w.Close() + + if meta, _, _, rerr := w.ReadAll(); rerr != nil || string(meta) != "abc" { + t.Fatalf("got error %v and meta %q, expected nil and %q", rerr, meta, "abc") + } +}