From 2ecb174c5efc780de68f46a52cea33b083deffee Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Wed, 10 Jan 2018 14:54:00 -0800 Subject: [PATCH] etcdctl/ctlv3: use "snapshot" package for "snapshot" command Signed-off-by: Gyuho Lee --- etcdctl/ctlv3/command/snapshot_command.go | 371 +++------------------- 1 file changed, 46 insertions(+), 325 deletions(-) diff --git a/etcdctl/ctlv3/command/snapshot_command.go b/etcdctl/ctlv3/command/snapshot_command.go index 2929cd94e8c7..9a3925bcd63e 100644 --- a/etcdctl/ctlv3/command/snapshot_command.go +++ b/etcdctl/ctlv3/command/snapshot_command.go @@ -16,34 +16,13 @@ package command import ( "context" - "crypto/sha256" - "encoding/binary" - "encoding/json" "fmt" - "hash/crc32" - "io" - "math" - "os" "path/filepath" - "reflect" "strings" - "github.com/coreos/etcd/etcdserver" - "github.com/coreos/etcd/etcdserver/etcdserverpb" - "github.com/coreos/etcd/etcdserver/membership" - "github.com/coreos/etcd/lease" - "github.com/coreos/etcd/mvcc" - "github.com/coreos/etcd/mvcc/backend" - "github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/types" - "github.com/coreos/etcd/raft" - "github.com/coreos/etcd/raft/raftpb" - "github.com/coreos/etcd/snap" - "github.com/coreos/etcd/store" - "github.com/coreos/etcd/wal" - "github.com/coreos/etcd/wal/walpb" + "github.com/coreos/etcd/snapshot" - bolt "github.com/coreos/bbolt" "github.com/spf13/cobra" ) @@ -118,32 +97,18 @@ func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) { path := args[0] - partpath := path + ".part" - f, err := os.Create(partpath) - + scfg := snapshot.Config{} + debug, err := cmd.Flags().GetBool("debug") if err != nil { - exiterr := fmt.Errorf("could not open %s (%v)", partpath, err) - ExitWithError(ExitBadArgs, exiterr) - } - - c := mustClientFromCmd(cmd) - r, serr := c.Snapshot(context.TODO()) - if serr != nil { - os.RemoveAll(partpath) - ExitWithError(ExitInterrupted, serr) + ExitWithError(ExitError, err) } - if _, rerr := io.Copy(f, r); rerr != nil { - os.RemoveAll(partpath) - ExitWithError(ExitInterrupted, rerr) + if debug { + scfg.Logger = snapshot.DefaultLogger } - fileutil.Fsync(f) - - f.Close() - - if rerr := os.Rename(partpath, path); rerr != nil { - exiterr := fmt.Errorf("could not rename %s to %s (%v)", partpath, path, rerr) - ExitWithError(ExitIO, exiterr) + sp := snapshot.NewV3(scfg) + if err := sp.Save(context.TODO(), mustClientFromCmd(cmd), path); err != nil { + ExitWithError(ExitInterrupted, err) } fmt.Printf("Snapshot saved at %s\n", path) } @@ -154,7 +119,20 @@ func snapshotStatusCommandFunc(cmd *cobra.Command, args []string) { ExitWithError(ExitBadArgs, err) } initDisplayFromCmd(cmd) - ds := dbStatus(args[0]) + + scfg := snapshot.Config{} + debug, err := cmd.Flags().GetBool("debug") + if err != nil { + ExitWithError(ExitError, err) + } + if debug { + scfg.Logger = snapshot.DefaultLogger + } + sp := snapshot.NewV3(scfg) + ds, err := sp.Status(args[0]) + if err != nil { + ExitWithError(ExitError, err) + } display.DBStatus(ds) } @@ -169,38 +147,36 @@ func snapshotRestoreCommandFunc(cmd *cobra.Command, args []string) { ExitWithError(ExitBadArgs, uerr) } - cfg := etcdserver.ServerConfig{ - InitialClusterToken: restoreClusterToken, - InitialPeerURLsMap: urlmap, - PeerURLs: types.MustNewURLs(strings.Split(restorePeerURLs, ",")), - Name: restoreName, - } - if err := cfg.VerifyBootstrap(); err != nil { - ExitWithError(ExitBadArgs, err) + dataDir := restoreDataDir + if dataDir == "" { + dataDir = restoreName + ".etcd" } - cl, cerr := membership.NewClusterFromURLsMap(restoreClusterToken, urlmap) - if cerr != nil { - ExitWithError(ExitBadArgs, cerr) + walDir := restoreWalDir + if walDir == "" { + walDir = filepath.Join(dataDir, "member", "wal") } - basedir := restoreDataDir - if basedir == "" { - basedir = restoreName + ".etcd" + scfg := snapshot.Config{} + debug, err := cmd.Flags().GetBool("debug") + if err != nil { + ExitWithError(ExitError, err) } - - waldir := restoreWalDir - if waldir == "" { - waldir = filepath.Join(basedir, "member", "wal") + if debug { + scfg.Logger = snapshot.DefaultLogger } - snapdir := filepath.Join(basedir, "member", "snap") - - if _, err := os.Stat(basedir); err == nil { - ExitWithError(ExitInvalidInput, fmt.Errorf("data-dir %q exists", basedir)) + sp := snapshot.NewV3(scfg) + if err := sp.Restore(args[0], snapshot.RestoreConfig{ + Name: restoreName, + OutputDataDir: dataDir, + OutputWALDir: walDir, + InitialCluster: urlmap, + InitialClusterToken: restoreClusterToken, + PeerURLs: types.MustNewURLs(strings.Split(restorePeerURLs, ",")), + SkipHashCheck: skipHashCheck, + }); err != nil { + ExitWithError(ExitError, err) } - - makeDB(snapdir, args[0], len(cl.Members())) - makeWALAndSnap(waldir, snapdir, cl) } func initialClusterFromName(name string) string { @@ -210,258 +186,3 @@ func initialClusterFromName(name string) string { } return fmt.Sprintf("%s=http://localhost:2380", n) } - -// makeWAL creates a WAL for the initial cluster -func makeWALAndSnap(waldir, snapdir string, cl *membership.RaftCluster) { - if err := fileutil.CreateDirAll(waldir); err != nil { - ExitWithError(ExitIO, err) - } - - // add members again to persist them to the store we create. - st := store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix) - cl.SetStore(st) - for _, m := range cl.Members() { - cl.AddMember(m) - } - - m := cl.MemberByName(restoreName) - md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(cl.ID())} - metadata, merr := md.Marshal() - if merr != nil { - ExitWithError(ExitInvalidInput, merr) - } - - w, walerr := wal.Create(waldir, metadata) - if walerr != nil { - ExitWithError(ExitIO, walerr) - } - defer w.Close() - - peers := make([]raft.Peer, len(cl.MemberIDs())) - for i, id := range cl.MemberIDs() { - ctx, err := json.Marshal((*cl).Member(id)) - if err != nil { - ExitWithError(ExitInvalidInput, err) - } - peers[i] = raft.Peer{ID: uint64(id), Context: ctx} - } - - ents := make([]raftpb.Entry, len(peers)) - nodeIDs := make([]uint64, len(peers)) - for i, p := range peers { - nodeIDs[i] = p.ID - cc := raftpb.ConfChange{ - Type: raftpb.ConfChangeAddNode, - NodeID: p.ID, - Context: p.Context} - d, err := cc.Marshal() - if err != nil { - ExitWithError(ExitInvalidInput, err) - } - e := raftpb.Entry{ - Type: raftpb.EntryConfChange, - Term: 1, - Index: uint64(i + 1), - Data: d, - } - ents[i] = e - } - - commit, term := uint64(len(ents)), uint64(1) - - if err := w.Save(raftpb.HardState{ - Term: term, - Vote: peers[0].ID, - Commit: commit}, ents); err != nil { - ExitWithError(ExitIO, err) - } - - b, berr := st.Save() - if berr != nil { - ExitWithError(ExitError, berr) - } - - raftSnap := raftpb.Snapshot{ - Data: b, - Metadata: raftpb.SnapshotMetadata{ - Index: commit, - Term: term, - ConfState: raftpb.ConfState{ - Nodes: nodeIDs, - }, - }, - } - snapshotter := snap.New(snapdir) - if err := snapshotter.SaveSnap(raftSnap); err != nil { - panic(err) - } - - if err := w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term}); err != nil { - ExitWithError(ExitIO, err) - } -} - -// initIndex implements ConsistentIndexGetter so the snapshot won't block -// the new raft instance by waiting for a future raft index. -type initIndex int - -func (i *initIndex) ConsistentIndex() uint64 { return uint64(*i) } - -// makeDB copies the database snapshot to the snapshot directory -func makeDB(snapdir, dbfile string, commit int) { - f, ferr := os.OpenFile(dbfile, os.O_RDONLY, 0600) - if ferr != nil { - ExitWithError(ExitInvalidInput, ferr) - } - defer f.Close() - - // get snapshot integrity hash - if _, err := f.Seek(-sha256.Size, io.SeekEnd); err != nil { - ExitWithError(ExitIO, err) - } - sha := make([]byte, sha256.Size) - if _, err := f.Read(sha); err != nil { - ExitWithError(ExitIO, err) - } - if _, err := f.Seek(0, io.SeekStart); err != nil { - ExitWithError(ExitIO, err) - } - - if err := fileutil.CreateDirAll(snapdir); err != nil { - ExitWithError(ExitIO, err) - } - - dbpath := filepath.Join(snapdir, "db") - db, dberr := os.OpenFile(dbpath, os.O_RDWR|os.O_CREATE, 0600) - if dberr != nil { - ExitWithError(ExitIO, dberr) - } - if _, err := io.Copy(db, f); err != nil { - ExitWithError(ExitIO, err) - } - - // truncate away integrity hash, if any. - off, serr := db.Seek(0, io.SeekEnd) - if serr != nil { - ExitWithError(ExitIO, serr) - } - hasHash := (off % 512) == sha256.Size - if hasHash { - if err := db.Truncate(off - sha256.Size); err != nil { - ExitWithError(ExitIO, err) - } - } - - if !hasHash && !skipHashCheck { - err := fmt.Errorf("snapshot missing hash but --skip-hash-check=false") - ExitWithError(ExitBadArgs, err) - } - - if hasHash && !skipHashCheck { - // check for match - if _, err := db.Seek(0, io.SeekStart); err != nil { - ExitWithError(ExitIO, err) - } - h := sha256.New() - if _, err := io.Copy(h, db); err != nil { - ExitWithError(ExitIO, err) - } - dbsha := h.Sum(nil) - if !reflect.DeepEqual(sha, dbsha) { - err := fmt.Errorf("expected sha256 %v, got %v", sha, dbsha) - ExitWithError(ExitInvalidInput, err) - } - } - - // db hash is OK, can now modify DB so it can be part of a new cluster - db.Close() - - // update consistentIndex so applies go through on etcdserver despite - // having a new raft instance - be := backend.NewDefaultBackend(dbpath) - // a lessor never timeouts leases - lessor := lease.NewLessor(be, math.MaxInt64) - s := mvcc.NewStore(be, lessor, (*initIndex)(&commit)) - txn := s.Write() - btx := be.BatchTx() - del := func(k, v []byte) error { - txn.DeleteRange(k, nil) - return nil - } - - // delete stored members from old cluster since using new members - btx.UnsafeForEach([]byte("members"), del) - // todo: add back new members when we start to deprecate old snap file. - btx.UnsafeForEach([]byte("members_removed"), del) - // trigger write-out of new consistent index - txn.End() - s.Commit() - s.Close() - be.Close() -} - -type dbstatus struct { - Hash uint32 `json:"hash"` - Revision int64 `json:"revision"` - TotalKey int `json:"totalKey"` - TotalSize int64 `json:"totalSize"` -} - -func dbStatus(p string) dbstatus { - if _, err := os.Stat(p); err != nil { - ExitWithError(ExitError, err) - } - - ds := dbstatus{} - - db, err := bolt.Open(p, 0400, &bolt.Options{ReadOnly: true}) - if err != nil { - ExitWithError(ExitError, err) - } - defer db.Close() - - h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) - - err = db.View(func(tx *bolt.Tx) error { - ds.TotalSize = tx.Size() - c := tx.Cursor() - for next, _ := c.First(); next != nil; next, _ = c.Next() { - b := tx.Bucket(next) - if b == nil { - return fmt.Errorf("cannot get hash of bucket %s", string(next)) - } - h.Write(next) - iskeyb := (string(next) == "key") - b.ForEach(func(k, v []byte) error { - h.Write(k) - h.Write(v) - if iskeyb { - rev := bytesToRev(k) - ds.Revision = rev.main - } - ds.TotalKey++ - return nil - }) - } - return nil - }) - - if err != nil { - ExitWithError(ExitError, err) - } - - ds.Hash = h.Sum32() - return ds -} - -type revision struct { - main int64 - sub int64 -} - -func bytesToRev(bytes []byte) revision { - return revision{ - main: int64(binary.BigEndian.Uint64(bytes[0:8])), - sub: int64(binary.BigEndian.Uint64(bytes[9:])), - } -}