Skip to content

Commit

Permalink
Create a v2 snapshot when running etcdutl migrate command
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr committed Jan 10, 2025
1 parent 9db8dcb commit 033f4cf
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 47 deletions.
1 change: 1 addition & 0 deletions etcdutl/ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func init() {
etcdutl.NewVersionCommand(),
etcdutl.NewCompletionCommand(),
etcdutl.NewMigrateCommand(),
etcdutl.NewV2SnapshotCommand(), // TODO: remove in 3.8

Check warning on line 48 in etcdutl/ctl.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/ctl.go#L48

Added line #L48 was not covered by tests
)
}

Expand Down
119 changes: 119 additions & 0 deletions etcdutl/etcdutl/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,23 @@
package etcdutl

import (
"errors"
"fmt"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.etcd.io/etcd/client/pkg/v3/logutil"
"go.etcd.io/etcd/pkg/v3/cobrautl"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/datadir"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
"go.etcd.io/raft/v3/raftpb"
)

func GetLogger() *zap.Logger {
Expand All @@ -32,3 +44,110 @@ func GetLogger() *zap.Logger {
}
return lg
}

func getLatestWALSnap(lg *zap.Logger, dataDir string) (walpb.Snapshot, error) {
snapshot, err := getLatestV2Snapshot(lg, dataDir)
if err != nil {
return walpb.Snapshot{}, err

Check warning on line 51 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L51

Added line #L51 was not covered by tests
}

var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
}
return walsnap, nil
}

func getLatestV2Snapshot(lg *zap.Logger, dataDir string) (*raftpb.Snapshot, error) {
walPath := datadir.ToWALDir(dataDir)
walSnaps, err := wal.ValidSnapshotEntries(lg, walPath)
if err != nil {
return nil, err

Check warning on line 65 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L65

Added line #L65 was not covered by tests
}

ss := snap.New(lg, datadir.ToSnapDir(dataDir))
snapshot, err := ss.LoadNewestAvailable(walSnaps)
if err != nil && !errors.Is(err, snap.ErrNoSnapshot) {
return nil, err

Check warning on line 71 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L71

Added line #L71 was not covered by tests
}

return snapshot, nil
}

func createV2SnapshotFromV3Store(dataDir string, be backend.Backend) error {
var (
lg = GetLogger()

Check warning on line 79 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L77-L79

Added lines #L77 - L79 were not covered by tests

snapDir = datadir.ToSnapDir(dataDir)
walDir = datadir.ToWALDir(dataDir)
)

Check warning on line 83 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L81-L83

Added lines #L81 - L83 were not covered by tests

ci, term := schema.ReadConsistentIndex(be.ReadTx())

Check warning on line 85 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L85

Added line #L85 was not covered by tests

cl := membership.NewCluster(lg)
cl.SetBackend(schema.NewMembershipBackend(lg, be))
cl.Load()

Check warning on line 89 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L87-L89

Added lines #L87 - L89 were not covered by tests

latestWALSnap, err := getLatestWALSnap(lg, dataDir)
if err != nil {
return err

Check warning on line 93 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L91-L93

Added lines #L91 - L93 were not covered by tests
}

// Each time before creating the v2 snapshot, etcdserve always flush
// the backend storage (bbolt db), so the consistent index should never
// less than the Index or term of the latest snapshot.
if ci < latestWALSnap.Index || term < latestWALSnap.Term {

Check warning on line 99 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L99

Added line #L99 was not covered by tests
// This should never happen
return fmt.Errorf("consistent_index [Index: %d, Term: %d] is less than the latest snapshot [Index: %d, Term: %d]", ci, term, latestWALSnap.Index, latestWALSnap.Term)

Check warning on line 101 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L101

Added line #L101 was not covered by tests
}

voters, learners := getVotersAndLearners(cl)
confState := raftpb.ConfState{
Voters: voters,
Learners: learners,

Check warning on line 107 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L104-L107

Added lines #L104 - L107 were not covered by tests
}

// create the v2 snaspshot file
raftSnap := raftpb.Snapshot{
Data: etcdserver.GetMembershipInfoInV2Format(lg, cl),
Metadata: raftpb.SnapshotMetadata{
Index: ci,
Term: term,
ConfState: confState,
},

Check warning on line 117 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L111-L117

Added lines #L111 - L117 were not covered by tests
}
sn := snap.New(lg, snapDir)
if err = sn.SaveSnap(raftSnap); err != nil {
return err

Check warning on line 121 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L119-L121

Added lines #L119 - L121 were not covered by tests
}

// save WAL snapshot record
w, err := wal.Open(lg, walDir, latestWALSnap)
if err != nil {
return err

Check warning on line 127 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L125-L127

Added lines #L125 - L127 were not covered by tests
}
defer w.Close()

Check warning on line 129 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L129

Added line #L129 was not covered by tests
// We must read all records to locate the tail of the last valid WAL file.
if _, _, _, err = w.ReadAll(); err != nil {
return err

Check warning on line 132 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L131-L132

Added lines #L131 - L132 were not covered by tests
}

return w.SaveSnapshot(walpb.Snapshot{Index: ci, Term: term, ConfState: &confState})

Check warning on line 135 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L135

Added line #L135 was not covered by tests
}

func getVotersAndLearners(cl *membership.RaftCluster) ([]uint64, []uint64) {
var (
voters []uint64
learners []uint64
)
for _, m := range cl.Members() {
if m.IsLearner {
learners = append(learners, uint64(m.ID))
continue

Check warning on line 146 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L138-L146

Added lines #L138 - L146 were not covered by tests
}

voters = append(voters, uint64(m.ID))

Check warning on line 149 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L149

Added line #L149 was not covered by tests
}

return voters, learners

Check warning on line 152 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L152

Added line #L152 was not covered by tests
}
86 changes: 44 additions & 42 deletions etcdutl/etcdutl/migrate_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package etcdutl

import (
"errors"
"fmt"
"strings"

Expand All @@ -25,12 +24,10 @@ import (

"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/pkg/v3/cobrautl"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/datadir"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
)

// NewMigrateCommand prints out the version of etcd.
Expand Down Expand Up @@ -77,8 +74,9 @@ func (o *migrateOptions) AddFlags(cmd *cobra.Command) {

func (o *migrateOptions) Config() (*migrateConfig, error) {
c := &migrateConfig{
force: o.force,
lg: GetLogger(),
force: o.force,
dataDir: o.dataDir,
lg: GetLogger(),

Check warning on line 79 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L77-L79

Added lines #L77 - L79 were not covered by tests
}
var err error
dotCount := strings.Count(o.targetVersion, ".")
Expand All @@ -93,67 +91,69 @@ func (o *migrateOptions) Config() (*migrateConfig, error) {
return nil, fmt.Errorf(`target version %q not supported. Minimal "3.5"`, storageVersionToString(c.targetVersion))
}

dbPath := datadir.ToBackendFileName(o.dataDir)
c.be = backend.NewDefaultBackend(GetLogger(), dbPath)
return c, nil

Check warning on line 94 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L94

Added line #L94 was not covered by tests
}

walPath := datadir.ToWALDir(o.dataDir)
walSnap, err := getLatestWALSnap(c.lg, o.dataDir)
type migrateConfig struct {
lg *zap.Logger
be backend.Backend
targetVersion *semver.Version
walVersion schema.WALVersion
dataDir string
force bool
}

func (c *migrateConfig) finalize() error {
walPath := datadir.ToWALDir(c.dataDir)
walSnap, err := getLatestWALSnap(c.lg, c.dataDir)

Check warning on line 108 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L106-L108

Added lines #L106 - L108 were not covered by tests
if err != nil {
return nil, fmt.Errorf("failed to get the lastest snapshot: %w", err)
return fmt.Errorf("failed to get the lastest snapshot: %w", err)

Check warning on line 110 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L110

Added line #L110 was not covered by tests
}
w, err := wal.OpenForRead(c.lg, walPath, walSnap)
if err != nil {
return nil, fmt.Errorf(`failed to open wal: %w`, err)
return fmt.Errorf(`failed to open wal: %w`, err)

Check warning on line 114 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L114

Added line #L114 was not covered by tests
}
defer w.Close()
c.walVersion, err = wal.ReadWALVersion(w)
if err != nil {
return nil, fmt.Errorf(`failed to read wal: %w`, err)
}

return c, nil
}

func getLatestWALSnap(lg *zap.Logger, dataDir string) (walpb.Snapshot, error) {
walPath := datadir.ToWALDir(dataDir)
walSnaps, err := wal.ValidSnapshotEntries(lg, walPath)
if err != nil {
return walpb.Snapshot{}, err
}

ss := snap.New(lg, datadir.ToSnapDir(dataDir))
snapshot, err := ss.LoadNewestAvailable(walSnaps)
if err != nil && !errors.Is(err, snap.ErrNoSnapshot) {
return walpb.Snapshot{}, err
}

var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
return fmt.Errorf(`failed to read wal: %w`, err)

Check warning on line 119 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L119

Added line #L119 was not covered by tests
}
return walsnap, nil
}

type migrateConfig struct {
lg *zap.Logger
be backend.Backend
targetVersion *semver.Version
walVersion schema.WALVersion
force bool
return nil

Check warning on line 122 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L122

Added line #L122 was not covered by tests
}

func migrateCommandFunc(c *migrateConfig) error {
dbPath := datadir.ToBackendFileName(c.dataDir)
c.be = backend.NewDefaultBackend(GetLogger(), dbPath)

Check warning on line 127 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L126-L127

Added lines #L126 - L127 were not covered by tests
defer c.be.Close()

tx := c.be.BatchTx()
current, err := schema.DetectSchemaVersion(c.lg, c.be.ReadTx())
if err != nil {
c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older")
c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older", zap.Error(err))

Check warning on line 133 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L133

Added line #L133 was not covered by tests
return err
}
if current == *c.targetVersion {
c.lg.Info("storage version up-to-date", zap.String("storage-version", storageVersionToString(&current)))
return nil
}

// Update cluster version
be := schema.NewMembershipBackend(c.lg, c.be)
be.MustSaveClusterVersionToBackend(c.targetVersion)

Check warning on line 143 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L142-L143

Added lines #L142 - L143 were not covered by tests

// forcibly create a v2 snapshot file
// TODO: remove in 3.8
if err = createV2SnapshotFromV3Store(c.dataDir, c.be); err != nil {
c.lg.Error("Failed to create v2 snapshot file", zap.Error(err))
return err

Check warning on line 149 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L147-L149

Added lines #L147 - L149 were not covered by tests
}

if err = c.finalize(); err != nil {
c.lg.Error("Failed to finalize config", zap.Error(err))
return err

Check warning on line 154 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L152-L154

Added lines #L152 - L154 were not covered by tests
}

err = schema.Migrate(c.lg, tx, c.walVersion, *c.targetVersion)
if err != nil {
if !c.force {
Expand All @@ -162,7 +162,9 @@ func migrateCommandFunc(c *migrateConfig) error {
c.lg.Info("normal migrate failed, trying with force", zap.Error(err))
migrateForce(c.lg, tx, c.targetVersion)
}

c.be.ForceCommit()

return nil
}

Expand Down
61 changes: 61 additions & 0 deletions etcdutl/etcdutl/v2snapshot_command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2025 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package etcdutl

import (
"fmt"
"path/filepath"

"github.com/spf13/cobra"

"go.etcd.io/etcd/pkg/v3/cobrautl"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/datadir"
)

var createV2SnapDataDir string

// NewV2SnapshotCommand returns the cobra command for "v2snapshot".
// TODO: remove the command in 3.8
func NewV2SnapshotCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "v2snapshot <subcommand>",
Short: "Manages etcd node v2snapshots",

Check warning on line 35 in etcdutl/etcdutl/v2snapshot_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/v2snapshot_command.go#L32-L35

Added lines #L32 - L35 were not covered by tests
}
cmd.AddCommand(newV2SnapshotCreateCommand())
return cmd

Check warning on line 38 in etcdutl/etcdutl/v2snapshot_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/v2snapshot_command.go#L37-L38

Added lines #L37 - L38 were not covered by tests
}

func newV2SnapshotCreateCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "create",
Short: "Create a new v2 snapshot file",
Run: v2SnapshotCreateFunc,

Check warning on line 45 in etcdutl/etcdutl/v2snapshot_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/v2snapshot_command.go#L41-L45

Added lines #L41 - L45 were not covered by tests
}
cmd.Flags().StringVar(&createV2SnapDataDir, "data-dir", "", "Required. Path to the etcd data directory.")
cmd.MarkFlagRequired("data-dir")
cmd.MarkFlagDirname("data-dir")
return cmd

Check warning on line 50 in etcdutl/etcdutl/v2snapshot_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/v2snapshot_command.go#L47-L50

Added lines #L47 - L50 were not covered by tests
}

func v2SnapshotCreateFunc(_ *cobra.Command, _ []string) {
be := backend.NewDefaultBackend(GetLogger(), filepath.Join(datadir.ToSnapDir(createV2SnapDataDir), "db"))
defer be.Close()

Check warning on line 55 in etcdutl/etcdutl/v2snapshot_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/v2snapshot_command.go#L53-L55

Added lines #L53 - L55 were not covered by tests

if err := createV2SnapshotFromV3Store(createV2SnapDataDir, be); err != nil {
cobrautl.ExitWithError(cobrautl.ExitError, err)

Check warning on line 58 in etcdutl/etcdutl/v2snapshot_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/v2snapshot_command.go#L57-L58

Added lines #L57 - L58 were not covered by tests
}
fmt.Println("Created a v2 snapshot file.")

Check warning on line 60 in etcdutl/etcdutl/v2snapshot_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/v2snapshot_command.go#L60

Added line #L60 was not covered by tests
}
16 changes: 11 additions & 5 deletions server/etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,22 +256,28 @@ func (c *RaftCluster) SetVersionChangedNotifier(n *notify.Notifier) {
c.versionChanged = n
}

func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
c.Lock()
defer c.Unlock()

func (c *RaftCluster) Load() {
if c.be != nil {
c.version = c.be.ClusterVersionFromBackend()
c.members, c.removed = c.be.MustReadMembersFromBackend()
} else {
c.version = clusterVersionFromStore(c.lg, c.v2store)
c.members, c.removed = membersFromStore(c.lg, c.v2store)
}
c.buildMembershipMetric()

if c.be != nil {
c.downgradeInfo = c.be.DowngradeInfoFromBackend()
}
}

func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
c.Lock()
defer c.Unlock()

c.Load()

c.buildMembershipMetric()

sv := semver.Must(semver.NewVersion(version.Version))
if c.downgradeInfo != nil && c.downgradeInfo.Enabled {
c.lg.Info(
Expand Down

0 comments on commit 033f4cf

Please sign in to comment.