From 72d52a7441b21148828f9b807181f1f6e0ae17f2 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Sun, 17 May 2020 15:51:39 -0700 Subject: [PATCH] *: make sure snapshot save downloads SHA256 checksum ref. https://github.com/etcd-io/etcd/pull/11896 Signed-off-by: Gyuho Lee --- .words | 1 + clientv3/maintenance.go | 22 ++++++--- clientv3/snapshot/v3_snapshot.go | 29 +++++++----- etcdserver/api/snap/snapshotter.go | 6 +-- etcdserver/api/v3rpc/maintenance.go | 69 +++++++++++++++++++++++++---- 5 files changed, 101 insertions(+), 26 deletions(-) diff --git a/.words b/.words index f9192b75e2d5..88318bb662d8 100644 --- a/.words +++ b/.words @@ -76,6 +76,7 @@ consistentIndex todo saveWALAndSnap +SHA subconns nop SubConns diff --git a/clientv3/maintenance.go b/clientv3/maintenance.go index 744455a3b36d..44bb3c5a2434 100644 --- a/clientv3/maintenance.go +++ b/clientv3/maintenance.go @@ -20,6 +20,7 @@ import ( "io" pb "go.etcd.io/etcd/etcdserver/etcdserverpb" + "go.uber.org/zap" "google.golang.org/grpc" ) @@ -68,6 +69,7 @@ type Maintenance interface { } type maintenance struct { + lg *zap.Logger dial func(endpoint string) (pb.MaintenanceClient, func(), error) remote pb.MaintenanceClient callOpts []grpc.CallOption @@ -75,6 +77,7 @@ type maintenance struct { func NewMaintenance(c *Client) Maintenance { api := &maintenance{ + lg: c.lg, dial: func(endpoint string) (pb.MaintenanceClient, func(), error) { conn, err := c.Dial(endpoint) if err != nil { @@ -93,6 +96,7 @@ func NewMaintenance(c *Client) Maintenance { func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient, c *Client) Maintenance { api := &maintenance{ + lg: c.lg, dial: func(string) (pb.MaintenanceClient, func(), error) { return remote, func() {}, nil }, @@ -193,23 +197,31 @@ func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) { return nil, toErr(ctx, err) } + m.lg.Info("opened snapshot stream; downloading") pr, pw := io.Pipe() go func() { for { resp, err := ss.Recv() if err != nil { - pw.CloseWithError(err) + switch err { + case io.EOF: + m.lg.Info("completed snapshot read; closing") + default: + m.lg.Warn("failed to receive from snapshot stream; closing", zap.Error(err)) + } return } - if resp == nil && err == nil { - break - } + + // can "resp == nil && err == nil" + // before we receive snapshot SHA digest? + // No, server sends EOF with an empty response + // after it sends SHA digest at the end + if _, werr := pw.Write(resp.Blob); werr != nil { pw.CloseWithError(werr) return } } - pw.Close() }() return &snapshotReadCloser{ctx: ctx, ReadCloser: pr}, nil } diff --git a/clientv3/snapshot/v3_snapshot.go b/clientv3/snapshot/v3_snapshot.go index 791035e7db88..b87976188757 100644 --- a/clientv3/snapshot/v3_snapshot.go +++ b/clientv3/snapshot/v3_snapshot.go @@ -28,6 +28,7 @@ import ( "strings" "time" + "github.com/dustin/go-humanize" bolt "go.etcd.io/bbolt" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/etcdserver" @@ -88,6 +89,14 @@ type v3Manager struct { skipHashCheck bool } +// hasChecksum returns "true" if the file size "n" +// has appended sha256 hash digest. +func hasChecksum(n int64) bool { + // 512 is chosen because it's a minimum disk sector size + // smaller than (and multiplies to) OS page size in most systems + return (n % 512) == sha256.Size +} + // Save fetches snapshot from remote etcd server and saves data to target path. func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string) error { if len(cfg.Endpoints) != 1 { @@ -107,10 +116,7 @@ func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string if err != nil { return fmt.Errorf("could not open %s (%v)", partpath, err) } - s.lg.Info( - "created temporary db file", - zap.String("path", partpath), - ) + s.lg.Info("created temporary db file", zap.String("path", partpath)) now := time.Now() var rd io.ReadCloser @@ -118,13 +124,15 @@ func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string if err != nil { return err } - s.lg.Info( - "fetching snapshot", - zap.String("endpoint", cfg.Endpoints[0]), - ) - if _, err = io.Copy(f, rd); err != nil { + s.lg.Info("fetching snapshot", zap.String("endpoint", cfg.Endpoints[0])) + var size int64 + size, err = io.Copy(f, rd) + if err != nil { return err } + if !hasChecksum(size) { + return fmt.Errorf("sha256 checksum not found [bytes: %d]", size) + } if err = fileutil.Fsync(f); err != nil { return err } @@ -134,6 +142,7 @@ func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string s.lg.Info( "fetched snapshot", zap.String("endpoint", cfg.Endpoints[0]), + zap.String("size", humanize.Bytes(uint64(size))), zap.Duration("took", time.Since(now)), ) @@ -346,7 +355,7 @@ func (s *v3Manager) saveDB() error { if serr != nil { return serr } - hasHash := (off % 512) == sha256.Size + hasHash := hasChecksum(off) if hasHash { if err := db.Truncate(off - sha256.Size); err != nil { return err diff --git a/etcdserver/api/snap/snapshotter.go b/etcdserver/api/snap/snapshotter.go index c7fc5c108d88..c5d6d6183c1b 100644 --- a/etcdserver/api/snap/snapshotter.go +++ b/etcdserver/api/snap/snapshotter.go @@ -313,7 +313,7 @@ func (s *Snapshotter) ReleaseSnapDBs(snap raftpb.Snapshot) error { if s.lg != nil { s.lg.Warn("failed to parse index from filename", zap.String("path", filename), zap.String("error", err.Error())) } else { - plog.Warnf("failed to parse index from filename: %s (%v)", filename, err) + plog.Warningf("failed to parse index from filename: %s (%v)", filename, err) } continue } @@ -321,13 +321,13 @@ func (s *Snapshotter) ReleaseSnapDBs(snap raftpb.Snapshot) error { if s.lg != nil { s.lg.Warn("found orphaned .snap.db file; deleting", zap.String("path", filename)) } else { - plog.Warnf("found orphaned .snap.db file; deleting: %s", filename) + plog.Warningf("found orphaned .snap.db file; deleting: %s", filename) } if rmErr := os.Remove(filepath.Join(s.dir, filename)); rmErr != nil && !os.IsNotExist(rmErr) { if s.lg != nil { s.lg.Warn("failed to remove orphaned .snap.db file", zap.String("path", filename), zap.Error(rmErr)) } else { - plog.Warnf("failed to remove orphaned .snap.db file: %s (%v)", filename, rmErr) + plog.Warningf("failed to remove orphaned .snap.db file: %s (%v)", filename, rmErr) } } } diff --git a/etcdserver/api/v3rpc/maintenance.go b/etcdserver/api/v3rpc/maintenance.go index c51271ac0fe3..8130adbf0abb 100644 --- a/etcdserver/api/v3rpc/maintenance.go +++ b/etcdserver/api/v3rpc/maintenance.go @@ -18,7 +18,9 @@ import ( "context" "crypto/sha256" "io" + "time" + "github.com/dustin/go-humanize" "go.etcd.io/etcd/auth" "go.etcd.io/etcd/etcdserver" "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" @@ -98,6 +100,9 @@ func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRe return &pb.DefragmentResponse{}, nil } +// big enough size to hold >1 OS pages in the buffer +const snapshotSendBufferSize = 32 * 1024 + func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error { snap := ms.bg.Backend().Snapshot() pr, pw := io.Pipe() @@ -116,19 +121,46 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance pw.Close() }() - // send file data + // record SHA digest of snapshot data + // used for integrity checks during snapshot restore operation h := sha256.New() - br := int64(0) - buf := make([]byte, 32*1024) - sz := snap.Size() - for br < sz { + + // buffer just holds read bytes from stream + // response size is multiple of OS page size, fetched in boltdb + // e.g. 4*1024 + buf := make([]byte, snapshotSendBufferSize) + + sent := int64(0) + total := snap.Size() + size := humanize.Bytes(uint64(total)) + + start := time.Now() + if ms.lg != nil { + ms.lg.Info("sending database snapshot to client", + zap.Int64("total-bytes", total), + zap.String("size", size), + ) + } else { + plog.Infof("sending database snapshot to client %s [%d bytes]", size, total) + } + for total-sent > 0 { n, err := io.ReadFull(pr, buf) if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { return togRPCError(err) } - br += int64(n) + sent += int64(n) + + // if total is x * snapshotSendBufferSize. it is possible that + // resp.RemainingBytes == 0 + // resp.Blob == zero byte but not nil + // does this make server response sent to client nil in proto + // and client stops receiving from snapshot stream before + // server sends snapshot SHA? + // No, the client will still receive non-nil response + // until server closes the stream with EOF + resp := &pb.SnapshotResponse{ - RemainingBytes: uint64(sz - br), + RemainingBytes: uint64(total - sent), Blob: buf[:n], } if err = srv.Send(resp); err != nil { @@ -137,13 +169,34 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance h.Write(buf[:n]) } - // send sha + // send SHA digest for integrity checks + // during snapshot restore operation sha := h.Sum(nil) + + if ms.lg != nil { + ms.lg.Info("sending database sha256 checksum to client", + zap.Int64("total-bytes", total), + zap.Int("checksum-size", len(sha)), + ) + } else { + plog.Infof("sending database sha256 checksum to client [%d bytes]", len(sha)) + } + hresp := &pb.SnapshotResponse{RemainingBytes: 0, Blob: sha} if err := srv.Send(hresp); err != nil { return togRPCError(err) } + if ms.lg != nil { + ms.lg.Info("successfully sent database snapshot to client", + zap.Int64("total-bytes", total), + zap.String("size", size), + zap.String("took", humanize.Time(start)), + ) + } else { + plog.Infof("successfully sent database snapshot to client %s [%d bytes]", size, total) + } + return nil }