diff --git a/e2e/ctl_v3_snapshot_test.go b/e2e/ctl_v3_snapshot_test.go index ed09045ebc2..79e601239a2 100644 --- a/e2e/ctl_v3_snapshot_test.go +++ b/e2e/ctl_v3_snapshot_test.go @@ -27,6 +27,7 @@ import ( "github.com/coreos/etcd/pkg/expect" "github.com/coreos/etcd/pkg/testutil" + "github.com/coreos/etcd/snapshot" ) func TestCtlV3Snapshot(t *testing.T) { testCtl(t, snapshotTest) } @@ -127,33 +128,26 @@ func ctlV3SnapshotSave(cx ctlCtx, fpath string) error { return spawnWithExpect(cmdArgs, fmt.Sprintf("Snapshot saved at %s", fpath)) } -type snapshotStatus struct { - Hash uint32 `json:"hash"` - Revision int64 `json:"revision"` - TotalKey int `json:"totalKey"` - TotalSize int64 `json:"totalSize"` -} - -func getSnapshotStatus(cx ctlCtx, fpath string) (snapshotStatus, error) { +func getSnapshotStatus(cx ctlCtx, fpath string) (snapshot.Status, error) { cmdArgs := append(cx.PrefixArgs(), "--write-out", "json", "snapshot", "status", fpath) proc, err := spawnCmd(cmdArgs) if err != nil { - return snapshotStatus{}, err + return snapshot.Status{}, err } var txt string txt, err = proc.Expect("totalKey") if err != nil { - return snapshotStatus{}, err + return snapshot.Status{}, err } if err = proc.Close(); err != nil { - return snapshotStatus{}, err + return snapshot.Status{}, err } - resp := snapshotStatus{} + resp := snapshot.Status{} dec := json.NewDecoder(strings.NewReader(txt)) if err := dec.Decode(&resp); err == io.EOF { - return snapshotStatus{}, err + return snapshot.Status{}, err } return resp, nil } diff --git a/etcdctl/ctlv3/command/printer.go b/etcdctl/ctlv3/command/printer.go index d03c41d778d..1f3aa09f03a 100644 --- a/etcdctl/ctlv3/command/printer.go +++ b/etcdctl/ctlv3/command/printer.go @@ -20,9 +20,10 @@ import ( "strings" v3 "github.com/coreos/etcd/clientv3" - "github.com/dustin/go-humanize" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/snapshot" + + "github.com/dustin/go-humanize" ) type printer interface { @@ -48,7 +49,7 @@ type printer interface { MoveLeader(leader, target uint64, r v3.MoveLeaderResponse) Alarm(v3.AlarmResponse) - DBStatus(dbstatus) + DBStatus(snapshot.Status) RoleAdd(role string, r v3.AuthRoleAddResponse) RoleGet(role string, r v3.AuthRoleGetResponse) @@ -150,7 +151,7 @@ func newPrinterUnsupported(n string) printer { func (p *printerUnsupported) EndpointStatus([]epStatus) { p.p(nil) } func (p *printerUnsupported) EndpointHashKV([]epHashKV) { p.p(nil) } -func (p *printerUnsupported) DBStatus(dbstatus) { p.p(nil) } +func (p *printerUnsupported) DBStatus(snapshot.Status) { p.p(nil) } func (p *printerUnsupported) MoveLeader(leader, target uint64, r v3.MoveLeaderResponse) { p.p(nil) } @@ -200,7 +201,7 @@ func makeEndpointHashKVTable(hashList []epHashKV) (hdr []string, rows [][]string return hdr, rows } -func makeDBStatusTable(ds dbstatus) (hdr []string, rows [][]string) { +func makeDBStatusTable(ds snapshot.Status) (hdr []string, rows [][]string) { hdr = []string{"hash", "revision", "total keys", "total size"} rows = append(rows, []string{ fmt.Sprintf("%x", ds.Hash), diff --git a/etcdctl/ctlv3/command/printer_fields.go b/etcdctl/ctlv3/command/printer_fields.go index 3b8ad9e4373..61cbbf33bcb 100644 --- a/etcdctl/ctlv3/command/printer_fields.go +++ b/etcdctl/ctlv3/command/printer_fields.go @@ -20,6 +20,7 @@ import ( v3 "github.com/coreos/etcd/clientv3" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" spb "github.com/coreos/etcd/mvcc/mvccpb" + "github.com/coreos/etcd/snapshot" ) type fieldsPrinter struct{ printer } @@ -172,7 +173,7 @@ func (p *fieldsPrinter) Alarm(r v3.AlarmResponse) { } } -func (p *fieldsPrinter) DBStatus(r dbstatus) { +func (p *fieldsPrinter) DBStatus(r snapshot.Status) { fmt.Println(`"Hash" :`, r.Hash) fmt.Println(`"Revision" :`, r.Revision) fmt.Println(`"Keys" :`, r.TotalKey) diff --git a/etcdctl/ctlv3/command/printer_json.go b/etcdctl/ctlv3/command/printer_json.go index 19b3a5e688b..eb985aae7c5 100644 --- a/etcdctl/ctlv3/command/printer_json.go +++ b/etcdctl/ctlv3/command/printer_json.go @@ -18,6 +18,8 @@ import ( "encoding/json" "fmt" "os" + + "github.com/coreos/etcd/snapshot" ) type jsonPrinter struct{ printer } @@ -30,7 +32,7 @@ func newJSONPrinter() printer { func (p *jsonPrinter) EndpointStatus(r []epStatus) { printJSON(r) } func (p *jsonPrinter) EndpointHashKV(r []epHashKV) { printJSON(r) } -func (p *jsonPrinter) DBStatus(r dbstatus) { printJSON(r) } +func (p *jsonPrinter) DBStatus(r snapshot.Status) { printJSON(r) } func printJSON(v interface{}) { b, err := json.Marshal(v) diff --git a/etcdctl/ctlv3/command/printer_simple.go b/etcdctl/ctlv3/command/printer_simple.go index 2f4f53201c8..1b36b88cdd4 100644 --- a/etcdctl/ctlv3/command/printer_simple.go +++ b/etcdctl/ctlv3/command/printer_simple.go @@ -21,6 +21,7 @@ import ( v3 "github.com/coreos/etcd/clientv3" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/snapshot" ) type simplePrinter struct { @@ -155,7 +156,7 @@ func (s *simplePrinter) EndpointHashKV(hashList []epHashKV) { } } -func (s *simplePrinter) DBStatus(ds dbstatus) { +func (s *simplePrinter) DBStatus(ds snapshot.Status) { _, rows := makeDBStatusTable(ds) for _, row := range rows { fmt.Println(strings.Join(row, ", ")) diff --git a/etcdctl/ctlv3/command/printer_table.go b/etcdctl/ctlv3/command/printer_table.go index 1aea61a8456..0ab97b4e559 100644 --- a/etcdctl/ctlv3/command/printer_table.go +++ b/etcdctl/ctlv3/command/printer_table.go @@ -17,9 +17,10 @@ package command import ( "os" - "github.com/olekukonko/tablewriter" - v3 "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/snapshot" + + "github.com/olekukonko/tablewriter" ) type tablePrinter struct{ printer } @@ -54,7 +55,7 @@ func (tp *tablePrinter) EndpointHashKV(r []epHashKV) { table.SetAlignment(tablewriter.ALIGN_RIGHT) table.Render() } -func (tp *tablePrinter) DBStatus(r dbstatus) { +func (tp *tablePrinter) DBStatus(r snapshot.Status) { hdr, rows := makeDBStatusTable(r) table := tablewriter.NewWriter(os.Stdout) table.SetHeader(hdr) diff --git a/etcdctl/ctlv3/command/snapshot_command.go b/etcdctl/ctlv3/command/snapshot_command.go index 2929cd94e8c..6e07c2a927d 100644 --- a/etcdctl/ctlv3/command/snapshot_command.go +++ b/etcdctl/ctlv3/command/snapshot_command.go @@ -16,34 +16,15 @@ package command import ( "context" - "crypto/sha256" - "encoding/binary" - "encoding/json" "fmt" - "hash/crc32" - "io" - "math" - "os" "path/filepath" - "reflect" "strings" - "github.com/coreos/etcd/etcdserver" - "github.com/coreos/etcd/etcdserver/etcdserverpb" - "github.com/coreos/etcd/etcdserver/membership" - "github.com/coreos/etcd/lease" - "github.com/coreos/etcd/mvcc" - "github.com/coreos/etcd/mvcc/backend" - "github.com/coreos/etcd/pkg/fileutil" + "github.com/coreos/etcd/pkg/logger" "github.com/coreos/etcd/pkg/types" - "github.com/coreos/etcd/raft" - "github.com/coreos/etcd/raft/raftpb" - "github.com/coreos/etcd/snap" - "github.com/coreos/etcd/store" - "github.com/coreos/etcd/wal" - "github.com/coreos/etcd/wal/walpb" + "github.com/coreos/etcd/snapshot" - bolt "github.com/coreos/bbolt" + "github.com/coreos/pkg/capnslog" "github.com/spf13/cobra" ) @@ -116,34 +97,19 @@ func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) { ExitWithError(ExitBadArgs, err) } - path := args[0] - - partpath := path + ".part" - f, err := os.Create(partpath) - + lg := logger.NewDiscardLogger() + debug, err := cmd.Flags().GetBool("debug") if err != nil { - exiterr := fmt.Errorf("could not open %s (%v)", partpath, err) - ExitWithError(ExitBadArgs, exiterr) - } - - c := mustClientFromCmd(cmd) - r, serr := c.Snapshot(context.TODO()) - if serr != nil { - os.RemoveAll(partpath) - ExitWithError(ExitInterrupted, serr) + ExitWithError(ExitError, err) } - if _, rerr := io.Copy(f, r); rerr != nil { - os.RemoveAll(partpath) - ExitWithError(ExitInterrupted, rerr) + if debug { + lg = logger.NewPackageLogger(capnslog.NewPackageLogger("github.com/coreos/etcd", "snapshot")) } + sp := snapshot.NewV3(mustClientFromCmd(cmd), lg) - fileutil.Fsync(f) - - f.Close() - - if rerr := os.Rename(partpath, path); rerr != nil { - exiterr := fmt.Errorf("could not rename %s to %s (%v)", partpath, path, rerr) - ExitWithError(ExitIO, exiterr) + path := args[0] + if err := sp.Save(context.TODO(), path); err != nil { + ExitWithError(ExitInterrupted, err) } fmt.Printf("Snapshot saved at %s\n", path) } @@ -154,7 +120,21 @@ func snapshotStatusCommandFunc(cmd *cobra.Command, args []string) { ExitWithError(ExitBadArgs, err) } initDisplayFromCmd(cmd) - ds := dbStatus(args[0]) + + lg := logger.NewDiscardLogger() + debug, err := cmd.Flags().GetBool("debug") + if err != nil { + ExitWithError(ExitError, err) + } + if debug { + lg = logger.NewPackageLogger(capnslog.NewPackageLogger("github.com/coreos/etcd", "snapshot")) + } + sp := snapshot.NewV3(nil, lg) + + ds, err := sp.Status(args[0]) + if err != nil { + ExitWithError(ExitError, err) + } display.DBStatus(ds) } @@ -169,38 +149,37 @@ func snapshotRestoreCommandFunc(cmd *cobra.Command, args []string) { ExitWithError(ExitBadArgs, uerr) } - cfg := etcdserver.ServerConfig{ - InitialClusterToken: restoreClusterToken, - InitialPeerURLsMap: urlmap, - PeerURLs: types.MustNewURLs(strings.Split(restorePeerURLs, ",")), - Name: restoreName, - } - if err := cfg.VerifyBootstrap(); err != nil { - ExitWithError(ExitBadArgs, err) + dataDir := restoreDataDir + if dataDir == "" { + dataDir = restoreName + ".etcd" } - cl, cerr := membership.NewClusterFromURLsMap(restoreClusterToken, urlmap) - if cerr != nil { - ExitWithError(ExitBadArgs, cerr) + walDir := restoreWalDir + if walDir == "" { + walDir = filepath.Join(dataDir, "member", "wal") } - basedir := restoreDataDir - if basedir == "" { - basedir = restoreName + ".etcd" + lg := logger.NewDiscardLogger() + debug, err := cmd.Flags().GetBool("debug") + if err != nil { + ExitWithError(ExitError, err) } - - waldir := restoreWalDir - if waldir == "" { - waldir = filepath.Join(basedir, "member", "wal") + if debug { + lg = logger.NewPackageLogger(capnslog.NewPackageLogger("github.com/coreos/etcd", "snapshot")) } - snapdir := filepath.Join(basedir, "member", "snap") + sp := snapshot.NewV3(nil, lg) - if _, err := os.Stat(basedir); err == nil { - ExitWithError(ExitInvalidInput, fmt.Errorf("data-dir %q exists", basedir)) + if err := sp.Restore(args[0], snapshot.RestoreConfig{ + Name: restoreName, + OutputDataDir: dataDir, + OutputWALDir: walDir, + InitialCluster: urlmap, + InitialClusterToken: restoreClusterToken, + PeerURLs: types.MustNewURLs(strings.Split(restorePeerURLs, ",")), + SkipHashCheck: skipHashCheck, + }); err != nil { + ExitWithError(ExitError, err) } - - makeDB(snapdir, args[0], len(cl.Members())) - makeWALAndSnap(waldir, snapdir, cl) } func initialClusterFromName(name string) string { @@ -210,258 +189,3 @@ func initialClusterFromName(name string) string { } return fmt.Sprintf("%s=http://localhost:2380", n) } - -// makeWAL creates a WAL for the initial cluster -func makeWALAndSnap(waldir, snapdir string, cl *membership.RaftCluster) { - if err := fileutil.CreateDirAll(waldir); err != nil { - ExitWithError(ExitIO, err) - } - - // add members again to persist them to the store we create. - st := store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix) - cl.SetStore(st) - for _, m := range cl.Members() { - cl.AddMember(m) - } - - m := cl.MemberByName(restoreName) - md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(cl.ID())} - metadata, merr := md.Marshal() - if merr != nil { - ExitWithError(ExitInvalidInput, merr) - } - - w, walerr := wal.Create(waldir, metadata) - if walerr != nil { - ExitWithError(ExitIO, walerr) - } - defer w.Close() - - peers := make([]raft.Peer, len(cl.MemberIDs())) - for i, id := range cl.MemberIDs() { - ctx, err := json.Marshal((*cl).Member(id)) - if err != nil { - ExitWithError(ExitInvalidInput, err) - } - peers[i] = raft.Peer{ID: uint64(id), Context: ctx} - } - - ents := make([]raftpb.Entry, len(peers)) - nodeIDs := make([]uint64, len(peers)) - for i, p := range peers { - nodeIDs[i] = p.ID - cc := raftpb.ConfChange{ - Type: raftpb.ConfChangeAddNode, - NodeID: p.ID, - Context: p.Context} - d, err := cc.Marshal() - if err != nil { - ExitWithError(ExitInvalidInput, err) - } - e := raftpb.Entry{ - Type: raftpb.EntryConfChange, - Term: 1, - Index: uint64(i + 1), - Data: d, - } - ents[i] = e - } - - commit, term := uint64(len(ents)), uint64(1) - - if err := w.Save(raftpb.HardState{ - Term: term, - Vote: peers[0].ID, - Commit: commit}, ents); err != nil { - ExitWithError(ExitIO, err) - } - - b, berr := st.Save() - if berr != nil { - ExitWithError(ExitError, berr) - } - - raftSnap := raftpb.Snapshot{ - Data: b, - Metadata: raftpb.SnapshotMetadata{ - Index: commit, - Term: term, - ConfState: raftpb.ConfState{ - Nodes: nodeIDs, - }, - }, - } - snapshotter := snap.New(snapdir) - if err := snapshotter.SaveSnap(raftSnap); err != nil { - panic(err) - } - - if err := w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term}); err != nil { - ExitWithError(ExitIO, err) - } -} - -// initIndex implements ConsistentIndexGetter so the snapshot won't block -// the new raft instance by waiting for a future raft index. -type initIndex int - -func (i *initIndex) ConsistentIndex() uint64 { return uint64(*i) } - -// makeDB copies the database snapshot to the snapshot directory -func makeDB(snapdir, dbfile string, commit int) { - f, ferr := os.OpenFile(dbfile, os.O_RDONLY, 0600) - if ferr != nil { - ExitWithError(ExitInvalidInput, ferr) - } - defer f.Close() - - // get snapshot integrity hash - if _, err := f.Seek(-sha256.Size, io.SeekEnd); err != nil { - ExitWithError(ExitIO, err) - } - sha := make([]byte, sha256.Size) - if _, err := f.Read(sha); err != nil { - ExitWithError(ExitIO, err) - } - if _, err := f.Seek(0, io.SeekStart); err != nil { - ExitWithError(ExitIO, err) - } - - if err := fileutil.CreateDirAll(snapdir); err != nil { - ExitWithError(ExitIO, err) - } - - dbpath := filepath.Join(snapdir, "db") - db, dberr := os.OpenFile(dbpath, os.O_RDWR|os.O_CREATE, 0600) - if dberr != nil { - ExitWithError(ExitIO, dberr) - } - if _, err := io.Copy(db, f); err != nil { - ExitWithError(ExitIO, err) - } - - // truncate away integrity hash, if any. - off, serr := db.Seek(0, io.SeekEnd) - if serr != nil { - ExitWithError(ExitIO, serr) - } - hasHash := (off % 512) == sha256.Size - if hasHash { - if err := db.Truncate(off - sha256.Size); err != nil { - ExitWithError(ExitIO, err) - } - } - - if !hasHash && !skipHashCheck { - err := fmt.Errorf("snapshot missing hash but --skip-hash-check=false") - ExitWithError(ExitBadArgs, err) - } - - if hasHash && !skipHashCheck { - // check for match - if _, err := db.Seek(0, io.SeekStart); err != nil { - ExitWithError(ExitIO, err) - } - h := sha256.New() - if _, err := io.Copy(h, db); err != nil { - ExitWithError(ExitIO, err) - } - dbsha := h.Sum(nil) - if !reflect.DeepEqual(sha, dbsha) { - err := fmt.Errorf("expected sha256 %v, got %v", sha, dbsha) - ExitWithError(ExitInvalidInput, err) - } - } - - // db hash is OK, can now modify DB so it can be part of a new cluster - db.Close() - - // update consistentIndex so applies go through on etcdserver despite - // having a new raft instance - be := backend.NewDefaultBackend(dbpath) - // a lessor never timeouts leases - lessor := lease.NewLessor(be, math.MaxInt64) - s := mvcc.NewStore(be, lessor, (*initIndex)(&commit)) - txn := s.Write() - btx := be.BatchTx() - del := func(k, v []byte) error { - txn.DeleteRange(k, nil) - return nil - } - - // delete stored members from old cluster since using new members - btx.UnsafeForEach([]byte("members"), del) - // todo: add back new members when we start to deprecate old snap file. - btx.UnsafeForEach([]byte("members_removed"), del) - // trigger write-out of new consistent index - txn.End() - s.Commit() - s.Close() - be.Close() -} - -type dbstatus struct { - Hash uint32 `json:"hash"` - Revision int64 `json:"revision"` - TotalKey int `json:"totalKey"` - TotalSize int64 `json:"totalSize"` -} - -func dbStatus(p string) dbstatus { - if _, err := os.Stat(p); err != nil { - ExitWithError(ExitError, err) - } - - ds := dbstatus{} - - db, err := bolt.Open(p, 0400, &bolt.Options{ReadOnly: true}) - if err != nil { - ExitWithError(ExitError, err) - } - defer db.Close() - - h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) - - err = db.View(func(tx *bolt.Tx) error { - ds.TotalSize = tx.Size() - c := tx.Cursor() - for next, _ := c.First(); next != nil; next, _ = c.Next() { - b := tx.Bucket(next) - if b == nil { - return fmt.Errorf("cannot get hash of bucket %s", string(next)) - } - h.Write(next) - iskeyb := (string(next) == "key") - b.ForEach(func(k, v []byte) error { - h.Write(k) - h.Write(v) - if iskeyb { - rev := bytesToRev(k) - ds.Revision = rev.main - } - ds.TotalKey++ - return nil - }) - } - return nil - }) - - if err != nil { - ExitWithError(ExitError, err) - } - - ds.Hash = h.Sum32() - return ds -} - -type revision struct { - main int64 - sub int64 -} - -func bytesToRev(bytes []byte) revision { - return revision{ - main: int64(binary.BigEndian.Uint64(bytes[0:8])), - sub: int64(binary.BigEndian.Uint64(bytes[9:])), - } -} diff --git a/snapshot/doc.go b/snapshot/doc.go new file mode 100644 index 00000000000..1c761be70d1 --- /dev/null +++ b/snapshot/doc.go @@ -0,0 +1,16 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package snapshot implements utilities around etcd snapshot. +package snapshot diff --git a/snapshot/util.go b/snapshot/util.go new file mode 100644 index 00000000000..93ba70b6c61 --- /dev/null +++ b/snapshot/util.go @@ -0,0 +1,35 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package snapshot + +import "encoding/binary" + +type revision struct { + main int64 + sub int64 +} + +func bytesToRev(bytes []byte) revision { + return revision{ + main: int64(binary.BigEndian.Uint64(bytes[0:8])), + sub: int64(binary.BigEndian.Uint64(bytes[9:])), + } +} + +// initIndex implements ConsistentIndexGetter so the snapshot won't block +// the new raft instance by waiting for a future raft index. +type initIndex int + +func (i *initIndex) ConsistentIndex() uint64 { return uint64(*i) } diff --git a/snapshot/v3_snapshot.go b/snapshot/v3_snapshot.go new file mode 100644 index 00000000000..09d25b11c8f --- /dev/null +++ b/snapshot/v3_snapshot.go @@ -0,0 +1,433 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package snapshot + +import ( + "context" + "crypto/sha256" + "encoding/json" + "fmt" + "hash/crc32" + "io" + "math" + "os" + "path/filepath" + "reflect" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/etcdserver/membership" + "github.com/coreos/etcd/lease" + "github.com/coreos/etcd/mvcc" + "github.com/coreos/etcd/mvcc/backend" + "github.com/coreos/etcd/pkg/fileutil" + "github.com/coreos/etcd/pkg/logger" + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft" + "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/snap" + "github.com/coreos/etcd/store" + "github.com/coreos/etcd/wal" + "github.com/coreos/etcd/wal/walpb" + + bolt "github.com/coreos/bbolt" +) + +// Manager defines snapshot methods. +type Manager interface { + // Save fetches snapshot from specified client's endpoints and saves to target path. + // If the context "ctx" is canceled or timed out, snapshot save stream will error out + // (e.g. context.Canceled, context.DeadlineExceeded). + Save(ctx context.Context, dbPath string) error + + // Status returns the snapshot file information. + Status(dbPath string) (Status, error) + + // Restore restores a new etcd data directory from given snapshot file. + Restore(dbPath string, cfg RestoreConfig) error +} + +// Status is the snapshot file status. +type Status struct { + Hash uint32 `json:"hash"` + Revision int64 `json:"revision"` + TotalKey int `json:"totalKey"` + TotalSize int64 `json:"totalSize"` +} + +// RestoreConfig configures snapshot restore operation. +type RestoreConfig struct { + // Name is the human-readable name of this member. + Name string + // OutputDataDir is the target data directory to save restored data. + // Defaults to "[Name].etcd" if not given. + OutputDataDir string + // OutputWALDir is the target WAL data directory. + // Defaults to "[OutputDataDir]/member/wal" if not given. + OutputWALDir string + // InitialCluster is the initial cluster configuration for restore bootstrap. + InitialCluster types.URLsMap + // InitialClusterToken is the initial cluster token for etcd cluster during restore bootstrap. + InitialClusterToken string + // PeerURLs is a list of member's peer URLs to advertise to the rest of the cluster. + PeerURLs types.URLs + // SkipHashCheck is "true" to ignore snapshot integrity hash value + // (required if copied from data directory). + SkipHashCheck bool +} + +// NewV3 returns a new snapshot Manager for v3.x snapshot. +// "*clientv3.Client" is only used for "Save" method. +// Otherwise, pass "nil". +func NewV3(cli *clientv3.Client, lg logger.Logger) Manager { + if lg == nil { + lg = logger.NewDiscardLogger() + } + return &v3Manager{cli: cli, logger: lg} +} + +type v3Manager struct { + cli *clientv3.Client + + name string + dbPath string + walDir string + snapDir string + cl *membership.RaftCluster + + skipHashCheck bool + logger logger.Logger +} + +func (s *v3Manager) Save(ctx context.Context, dbPath string) error { + partpath := dbPath + ".part" + f, err := os.Create(partpath) + if err != nil { + os.RemoveAll(partpath) + return fmt.Errorf("could not open %s (%v)", partpath, err) + } + s.logger.Infof("created temporary db file %q", partpath) + + var rd io.ReadCloser + rd, err = s.cli.Snapshot(ctx) + if err != nil { + os.RemoveAll(partpath) + return err + } + s.logger.Infof("copying from snapshot stream") + if _, err = io.Copy(f, rd); err != nil { + os.RemoveAll(partpath) + return err + } + if err = fileutil.Fsync(f); err != nil { + os.RemoveAll(partpath) + return err + } + if err = f.Close(); err != nil { + os.RemoveAll(partpath) + return err + } + + s.logger.Infof("renaming from %q to %q", partpath, dbPath) + if err = os.Rename(partpath, dbPath); err != nil { + os.RemoveAll(partpath) + return fmt.Errorf("could not rename %s to %s (%v)", partpath, dbPath, err) + } + return nil +} + +func (s *v3Manager) Status(dbPath string) (ds Status, err error) { + if _, err = os.Stat(dbPath); err != nil { + return ds, err + } + + db, err := bolt.Open(dbPath, 0400, &bolt.Options{ReadOnly: true}) + if err != nil { + return ds, err + } + defer db.Close() + + h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) + + if err = db.View(func(tx *bolt.Tx) error { + ds.TotalSize = tx.Size() + c := tx.Cursor() + for next, _ := c.First(); next != nil; next, _ = c.Next() { + b := tx.Bucket(next) + if b == nil { + return fmt.Errorf("cannot get hash of bucket %s", string(next)) + } + h.Write(next) + iskeyb := (string(next) == "key") + b.ForEach(func(k, v []byte) error { + h.Write(k) + h.Write(v) + if iskeyb { + rev := bytesToRev(k) + ds.Revision = rev.main + } + ds.TotalKey++ + return nil + }) + } + return nil + }); err != nil { + return ds, err + } + + ds.Hash = h.Sum32() + return ds, nil +} + +func (s *v3Manager) Restore(dbPath string, cfg RestoreConfig) error { + srv := etcdserver.ServerConfig{ + Name: cfg.Name, + InitialClusterToken: cfg.InitialClusterToken, + InitialPeerURLsMap: cfg.InitialCluster, + PeerURLs: cfg.PeerURLs, + } + if err := srv.VerifyBootstrap(); err != nil { + return err + } + + var err error + s.cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialCluster) + if err != nil { + return err + } + + dataDir := cfg.OutputDataDir + if dataDir == "" { + dataDir = cfg.Name + ".etcd" + } + if _, err = os.Stat(dataDir); err == nil { + return fmt.Errorf("data-dir %q exists", dataDir) + } + walDir := cfg.OutputWALDir + if walDir == "" { + walDir = filepath.Join(dataDir, "member", "wal") + } else if _, err = os.Stat(walDir); err == nil { + return fmt.Errorf("wal-dir %q exists", walDir) + } + s.logger.Infof("restoring snapshot file %q to data-dir %q, wal-dir %q", dbPath, dataDir, walDir) + + s.name = cfg.Name + s.dbPath = dbPath + s.walDir = walDir + s.snapDir = filepath.Join(dataDir, "member", "snap") + s.skipHashCheck = cfg.SkipHashCheck + + s.logger.Infof("writing snapshot directory %q", s.snapDir) + if err = s.saveDB(); err != nil { + return err + } + s.logger.Infof("writing WAL directory %q and raft snapshot to %q", s.walDir, s.snapDir) + err = s.saveWALAndSnap() + if err == nil { + s.logger.Infof("finished restore %q to data directory %q, wal directory %q", dbPath, dataDir, walDir) + } + return err +} + +// saveDB copies the database snapshot to the snapshot directory +func (s *v3Manager) saveDB() error { + f, ferr := os.OpenFile(s.dbPath, os.O_RDONLY, 0600) + if ferr != nil { + return ferr + } + defer f.Close() + + // get snapshot integrity hash + if _, err := f.Seek(-sha256.Size, io.SeekEnd); err != nil { + return err + } + sha := make([]byte, sha256.Size) + if _, err := f.Read(sha); err != nil { + return err + } + if _, err := f.Seek(0, io.SeekStart); err != nil { + return err + } + + if err := fileutil.CreateDirAll(s.snapDir); err != nil { + return err + } + + dbpath := filepath.Join(s.snapDir, "db") + db, dberr := os.OpenFile(dbpath, os.O_RDWR|os.O_CREATE, 0600) + if dberr != nil { + return dberr + } + if _, err := io.Copy(db, f); err != nil { + return err + } + + // truncate away integrity hash, if any. + off, serr := db.Seek(0, io.SeekEnd) + if serr != nil { + return serr + } + hasHash := (off % 512) == sha256.Size + if hasHash { + if err := db.Truncate(off - sha256.Size); err != nil { + return err + } + } + + if !hasHash && !s.skipHashCheck { + return fmt.Errorf("snapshot missing hash but --skip-hash-check=false") + } + + if hasHash && !s.skipHashCheck { + // check for match + if _, err := db.Seek(0, io.SeekStart); err != nil { + return err + } + h := sha256.New() + if _, err := io.Copy(h, db); err != nil { + return err + } + dbsha := h.Sum(nil) + if !reflect.DeepEqual(sha, dbsha) { + return fmt.Errorf("expected sha256 %v, got %v", sha, dbsha) + } + } + + // db hash is OK, can now modify DB so it can be part of a new cluster + db.Close() + + commit := len(s.cl.Members()) + + // update consistentIndex so applies go through on etcdserver despite + // having a new raft instance + be := backend.NewDefaultBackend(dbpath) + + // a lessor never timeouts leases + lessor := lease.NewLessor(be, math.MaxInt64) + + mvs := mvcc.NewStore(be, lessor, (*initIndex)(&commit)) + txn := mvs.Write() + btx := be.BatchTx() + del := func(k, v []byte) error { + txn.DeleteRange(k, nil) + return nil + } + + // delete stored members from old cluster since using new members + btx.UnsafeForEach([]byte("members"), del) + + // todo: add back new members when we start to deprecate old snap file. + btx.UnsafeForEach([]byte("members_removed"), del) + + // trigger write-out of new consistent index + txn.End() + + mvs.Commit() + mvs.Close() + be.Close() + + return nil +} + +// saveWALAndSnap creates a WAL for the initial cluster +func (s *v3Manager) saveWALAndSnap() error { + if err := fileutil.CreateDirAll(s.walDir); err != nil { + return err + } + + // add members again to persist them to the store we create. + st := store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix) + s.cl.SetStore(st) + for _, m := range s.cl.Members() { + s.cl.AddMember(m) + } + + m := s.cl.MemberByName(s.name) + md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(s.cl.ID())} + metadata, merr := md.Marshal() + if merr != nil { + return merr + } + w, walerr := wal.Create(s.walDir, metadata) + if walerr != nil { + return walerr + } + defer w.Close() + + peers := make([]raft.Peer, len(s.cl.MemberIDs())) + for i, id := range s.cl.MemberIDs() { + ctx, err := json.Marshal((*s.cl).Member(id)) + if err != nil { + return err + } + peers[i] = raft.Peer{ID: uint64(id), Context: ctx} + } + + ents := make([]raftpb.Entry, len(peers)) + nodeIDs := make([]uint64, len(peers)) + for i, p := range peers { + nodeIDs[i] = p.ID + cc := raftpb.ConfChange{ + Type: raftpb.ConfChangeAddNode, + NodeID: p.ID, + Context: p.Context, + } + d, err := cc.Marshal() + if err != nil { + return err + } + ents[i] = raftpb.Entry{ + Type: raftpb.EntryConfChange, + Term: 1, + Index: uint64(i + 1), + Data: d, + } + } + + commit, term := uint64(len(ents)), uint64(1) + if err := w.Save(raftpb.HardState{ + Term: term, + Vote: peers[0].ID, + Commit: commit, + }, ents); err != nil { + return err + } + + b, berr := st.Save() + if berr != nil { + return berr + } + raftSnap := raftpb.Snapshot{ + Data: b, + Metadata: raftpb.SnapshotMetadata{ + Index: commit, + Term: term, + ConfState: raftpb.ConfState{ + Nodes: nodeIDs, + }, + }, + } + sn := snap.New(s.snapDir) + if err := sn.SaveSnap(raftSnap); err != nil { + return err + } + + err := w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term}) + if err == nil { + s.logger.Infof("wrote WAL snapshot to %q", s.walDir) + } + return err +}