Skip to content

Commit

Permalink
Create a v2 snapshot when running etcdutl migrate command
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr committed Jan 10, 2025
1 parent 9db8dcb commit 1b2204b
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 42 deletions.
1 change: 1 addition & 0 deletions etcdutl/ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func init() {
etcdutl.NewVersionCommand(),
etcdutl.NewCompletionCommand(),
etcdutl.NewMigrateCommand(),
etcdutl.NewV2SnapshotCommand(), // TODO: remove in 3.8

Check warning on line 48 in etcdutl/ctl.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/ctl.go#L48

Added line #L48 was not covered by tests
)
}

Expand Down
120 changes: 120 additions & 0 deletions etcdutl/etcdutl/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,24 @@
package etcdutl

import (
"errors"
"fmt"

"github.com/coreos/go-semver/semver"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.etcd.io/etcd/client/pkg/v3/logutil"
"go.etcd.io/etcd/pkg/v3/cobrautl"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/datadir"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
"go.etcd.io/raft/v3/raftpb"
)

func GetLogger() *zap.Logger {
Expand All @@ -32,3 +45,110 @@ func GetLogger() *zap.Logger {
}
return lg
}

func getLatestWALSnap(lg *zap.Logger, dataDir string) (walpb.Snapshot, error) {
snapshot, err := getLatestV2Snapshot(lg, dataDir)
if err != nil {
return walpb.Snapshot{}, err

Check warning on line 52 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L52

Added line #L52 was not covered by tests
}

var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
}
return walsnap, nil
}

func getLatestV2Snapshot(lg *zap.Logger, dataDir string) (*raftpb.Snapshot, error) {
walPath := datadir.ToWALDir(dataDir)
walSnaps, err := wal.ValidSnapshotEntries(lg, walPath)
if err != nil {
return nil, err

Check warning on line 66 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L66

Added line #L66 was not covered by tests
}

ss := snap.New(lg, datadir.ToSnapDir(dataDir))
snapshot, err := ss.LoadNewestAvailable(walSnaps)
if err != nil && !errors.Is(err, snap.ErrNoSnapshot) {
return nil, err

Check warning on line 72 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L72

Added line #L72 was not covered by tests
}

return snapshot, nil
}

func createV2SnapshotFromV3Store(dataDir string, be backend.Backend) error {
var (
lg = GetLogger()

Check warning on line 80 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L78-L80

Added lines #L78 - L80 were not covered by tests

snapDir = datadir.ToSnapDir(dataDir)
walDir = datadir.ToWALDir(dataDir)
)

Check warning on line 84 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L82-L84

Added lines #L82 - L84 were not covered by tests

ci, term := schema.ReadConsistentIndex(be.ReadTx())

Check warning on line 86 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L86

Added line #L86 was not covered by tests

cl := membership.NewCluster(lg)
cl.SetBackend(schema.NewMembershipBackend(lg, be))
cl.Recover(func(*zap.Logger, *semver.Version) {})

Check warning on line 90 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L88-L90

Added lines #L88 - L90 were not covered by tests

latestWalSnap, err := getLatestWALSnap(lg, dataDir)
if err != nil {
return err

Check warning on line 94 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L92-L94

Added lines #L92 - L94 were not covered by tests
}

// Each time before creating the v2 snapshot, etcdserve always flush
// the backend storage (bbolt db), so the consistent index should never
// less than the Index or term of the latest snapshot.
if ci < latestWalSnap.Index || term < latestWalSnap.Term {

Check warning on line 100 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L100

Added line #L100 was not covered by tests
// This should never happen
return fmt.Errorf("consistent_index [Index: %d, Term: %d] is less than the latest snapshot [Index: %d, Term: %d]", ci, term, latestWalSnap.Index, latestWalSnap.Term)

Check warning on line 102 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L102

Added line #L102 was not covered by tests
}

voters, learners := getVotersAndLearners(cl)
confState := raftpb.ConfState{
Voters: voters,
Learners: learners,

Check warning on line 108 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L105-L108

Added lines #L105 - L108 were not covered by tests
}

// create the v2 snaspshot file
raftSnap := raftpb.Snapshot{
Data: etcdserver.GetMembershipInfoInV2Format(lg, cl),
Metadata: raftpb.SnapshotMetadata{
Index: ci,
Term: term,
ConfState: confState,
},

Check warning on line 118 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L112-L118

Added lines #L112 - L118 were not covered by tests
}
sn := snap.New(lg, snapDir)
if err := sn.SaveSnap(raftSnap); err != nil {
return err

Check warning on line 122 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L120-L122

Added lines #L120 - L122 were not covered by tests
}

// save WAL snapshot record
w, err := wal.Open(lg, walDir, latestWalSnap)
if err != nil {
return err

Check warning on line 128 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L126-L128

Added lines #L126 - L128 were not covered by tests
}
defer w.Close()

Check warning on line 130 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L130

Added line #L130 was not covered by tests
// We must read all records to locate the tail of the last valid WAL file.
if _, _, _, err = w.ReadAll(); err != nil {
return err

Check warning on line 133 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L132-L133

Added lines #L132 - L133 were not covered by tests
}

return w.SaveSnapshot(walpb.Snapshot{Index: ci, Term: term, ConfState: &confState})

Check warning on line 136 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L136

Added line #L136 was not covered by tests
}

func getVotersAndLearners(cl *membership.RaftCluster) ([]uint64, []uint64) {
var (
voters []uint64
learners []uint64
)
for _, m := range cl.Members() {
if m.IsLearner {
learners = append(learners, uint64(m.ID))
continue

Check warning on line 147 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L139-L147

Added lines #L139 - L147 were not covered by tests
}

voters = append(voters, uint64(m.ID))

Check warning on line 150 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L150

Added line #L150 was not covered by tests
}

return voters, learners

Check warning on line 153 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L153

Added line #L153 was not covered by tests
}
86 changes: 44 additions & 42 deletions etcdutl/etcdutl/migrate_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package etcdutl

import (
"errors"
"fmt"
"strings"

Expand All @@ -25,12 +24,10 @@ import (

"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/pkg/v3/cobrautl"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/datadir"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
)

// NewMigrateCommand prints out the version of etcd.
Expand Down Expand Up @@ -77,8 +74,9 @@ func (o *migrateOptions) AddFlags(cmd *cobra.Command) {

func (o *migrateOptions) Config() (*migrateConfig, error) {
c := &migrateConfig{
force: o.force,
lg: GetLogger(),
force: o.force,
dataDir: o.dataDir,
lg: GetLogger(),

Check warning on line 79 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L77-L79

Added lines #L77 - L79 were not covered by tests
}
var err error
dotCount := strings.Count(o.targetVersion, ".")
Expand All @@ -93,67 +91,69 @@ func (o *migrateOptions) Config() (*migrateConfig, error) {
return nil, fmt.Errorf(`target version %q not supported. Minimal "3.5"`, storageVersionToString(c.targetVersion))
}

dbPath := datadir.ToBackendFileName(o.dataDir)
c.be = backend.NewDefaultBackend(GetLogger(), dbPath)
return c, nil

Check warning on line 94 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L94

Added line #L94 was not covered by tests
}

walPath := datadir.ToWALDir(o.dataDir)
walSnap, err := getLatestWALSnap(c.lg, o.dataDir)
type migrateConfig struct {
lg *zap.Logger
be backend.Backend
targetVersion *semver.Version
walVersion schema.WALVersion
dataDir string
force bool
}

func (c *migrateConfig) finalize() error {
walPath := datadir.ToWALDir(c.dataDir)
walSnap, err := getLatestWALSnap(c.lg, c.dataDir)

Check warning on line 108 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L106-L108

Added lines #L106 - L108 were not covered by tests
if err != nil {
return nil, fmt.Errorf("failed to get the lastest snapshot: %w", err)
return fmt.Errorf("failed to get the lastest snapshot: %w", err)

Check warning on line 110 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L110

Added line #L110 was not covered by tests
}
w, err := wal.OpenForRead(c.lg, walPath, walSnap)
if err != nil {
return nil, fmt.Errorf(`failed to open wal: %w`, err)
return fmt.Errorf(`failed to open wal: %w`, err)

Check warning on line 114 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L114

Added line #L114 was not covered by tests
}
defer w.Close()
c.walVersion, err = wal.ReadWALVersion(w)
if err != nil {
return nil, fmt.Errorf(`failed to read wal: %w`, err)
}

return c, nil
}

func getLatestWALSnap(lg *zap.Logger, dataDir string) (walpb.Snapshot, error) {
walPath := datadir.ToWALDir(dataDir)
walSnaps, err := wal.ValidSnapshotEntries(lg, walPath)
if err != nil {
return walpb.Snapshot{}, err
}

ss := snap.New(lg, datadir.ToSnapDir(dataDir))
snapshot, err := ss.LoadNewestAvailable(walSnaps)
if err != nil && !errors.Is(err, snap.ErrNoSnapshot) {
return walpb.Snapshot{}, err
}

var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
return fmt.Errorf(`failed to read wal: %w`, err)

Check warning on line 119 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L119

Added line #L119 was not covered by tests
}
return walsnap, nil
}

type migrateConfig struct {
lg *zap.Logger
be backend.Backend
targetVersion *semver.Version
walVersion schema.WALVersion
force bool
return nil

Check warning on line 122 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L122

Added line #L122 was not covered by tests
}

func migrateCommandFunc(c *migrateConfig) error {
dbPath := datadir.ToBackendFileName(c.dataDir)
c.be = backend.NewDefaultBackend(GetLogger(), dbPath)

Check warning on line 127 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L126-L127

Added lines #L126 - L127 were not covered by tests
defer c.be.Close()

tx := c.be.BatchTx()
current, err := schema.DetectSchemaVersion(c.lg, c.be.ReadTx())
if err != nil {
c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older")
c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older", zap.Error(err))

Check warning on line 133 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L133

Added line #L133 was not covered by tests
return err
}
if current == *c.targetVersion {
c.lg.Info("storage version up-to-date", zap.String("storage-version", storageVersionToString(&current)))
return nil
}

// Update cluster version
be := schema.NewMembershipBackend(c.lg, c.be)
be.MustSaveClusterVersionToBackend(c.targetVersion)

Check warning on line 143 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L142-L143

Added lines #L142 - L143 were not covered by tests

// forcibly create a v2 snapshot file
// TODO: remove in 3.8
if err = createV2SnapshotFromV3Store(c.dataDir, c.be); err != nil {
c.lg.Error("Failed to create v2 snapshot file", zap.Error(err))
return err

Check warning on line 149 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L147-L149

Added lines #L147 - L149 were not covered by tests
}

if err = c.finalize(); err != nil {
c.lg.Error("Failed to finalize config", zap.Error(err))
return err

Check warning on line 154 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L152-L154

Added lines #L152 - L154 were not covered by tests
}

err = schema.Migrate(c.lg, tx, c.walVersion, *c.targetVersion)
if err != nil {
if !c.force {
Expand All @@ -162,7 +162,9 @@ func migrateCommandFunc(c *migrateConfig) error {
c.lg.Info("normal migrate failed, trying with force", zap.Error(err))
migrateForce(c.lg, tx, c.targetVersion)
}

c.be.ForceCommit()

return nil
}

Expand Down
61 changes: 61 additions & 0 deletions etcdutl/etcdutl/v2snapshot_command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2025 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package etcdutl

import (
"fmt"
"path/filepath"

"github.com/spf13/cobra"

"go.etcd.io/etcd/pkg/v3/cobrautl"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/datadir"
)

var createV2SnapDataDir string

// NewV2SnapshotCommand returns the cobra command for "v2snapshot".
// TODO: remove the command in 3.8
func NewV2SnapshotCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "v2snapshot <subcommand>",
Short: "Manages etcd node v2snapshots",

Check warning on line 35 in etcdutl/etcdutl/v2snapshot_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/v2snapshot_command.go#L32-L35

Added lines #L32 - L35 were not covered by tests
}
cmd.AddCommand(newV2SnapshotCreateCommand())
return cmd

Check warning on line 38 in etcdutl/etcdutl/v2snapshot_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/v2snapshot_command.go#L37-L38

Added lines #L37 - L38 were not covered by tests
}

func newV2SnapshotCreateCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "create",
Short: "Create a new v2 snapshot file",
Run: v2SnapshotCreateFunc,

Check warning on line 45 in etcdutl/etcdutl/v2snapshot_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/v2snapshot_command.go#L41-L45

Added lines #L41 - L45 were not covered by tests
}
cmd.Flags().StringVar(&createV2SnapDataDir, "data-dir", "", "Required. Path to the etcd data directory.")
cmd.MarkFlagRequired("data-dir")
cmd.MarkFlagDirname("data-dir")
return cmd

Check warning on line 50 in etcdutl/etcdutl/v2snapshot_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/v2snapshot_command.go#L47-L50

Added lines #L47 - L50 were not covered by tests
}

func v2SnapshotCreateFunc(_ *cobra.Command, _ []string) {
be := backend.NewDefaultBackend(GetLogger(), filepath.Join(datadir.ToSnapDir(createV2SnapDataDir), "db"))
defer be.Close()

Check warning on line 55 in etcdutl/etcdutl/v2snapshot_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/v2snapshot_command.go#L53-L55

Added lines #L53 - L55 were not covered by tests

if err := createV2SnapshotFromV3Store(createV2SnapDataDir, be); err != nil {
cobrautl.ExitWithError(cobrautl.ExitError, err)

Check warning on line 58 in etcdutl/etcdutl/v2snapshot_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/v2snapshot_command.go#L57-L58

Added lines #L57 - L58 were not covered by tests
}
fmt.Println("Created a v2 snapshot file.")

Check warning on line 60 in etcdutl/etcdutl/v2snapshot_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/v2snapshot_command.go#L60

Added line #L60 was not covered by tests
}

0 comments on commit 1b2204b

Please sign in to comment.