From d17821f712a83e435e6c9f9e14cec3e839469810 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Mon, 6 Jan 2025 15:20:19 +0000 Subject: [PATCH] Update etcdutl migrate command: load wal records from the latest snapshot Signed-off-by: Benjamin Wang --- etcdutl/etcdutl/migrate_command.go | 28 ++++- etcdutl/etcdutl/migrate_command_test.go | 143 ++++++++++++++++++++++++ 2 files changed, 170 insertions(+), 1 deletion(-) create mode 100644 etcdutl/etcdutl/migrate_command_test.go diff --git a/etcdutl/etcdutl/migrate_command.go b/etcdutl/etcdutl/migrate_command.go index 15188984f85..7f435b80a28 100644 --- a/etcdutl/etcdutl/migrate_command.go +++ b/etcdutl/etcdutl/migrate_command.go @@ -15,6 +15,7 @@ package etcdutl import ( + "errors" "fmt" "strings" @@ -24,6 +25,7 @@ import ( "go.etcd.io/etcd/api/v3/version" "go.etcd.io/etcd/pkg/v3/cobrautl" + "go.etcd.io/etcd/server/v3/etcdserver/api/snap" "go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/datadir" "go.etcd.io/etcd/server/v3/storage/schema" @@ -95,7 +97,11 @@ func (o *migrateOptions) Config() (*migrateConfig, error) { c.be = backend.NewDefaultBackend(GetLogger(), dbPath) walPath := datadir.ToWALDir(o.dataDir) - w, err := wal.OpenForRead(c.lg, walPath, walpb.Snapshot{}) + walSnap, err := getLatestWALSnap(c.lg, o.dataDir) + if err != nil { + return nil, fmt.Errorf("failed to get the lastest snapshot: %w", err) + } + w, err := wal.OpenForRead(c.lg, walPath, walSnap) if err != nil { return nil, fmt.Errorf(`failed to open wal: %w`, err) } @@ -108,6 +114,26 @@ func (o *migrateOptions) Config() (*migrateConfig, error) { return c, nil } +func getLatestWALSnap(lg *zap.Logger, dataDir string) (walpb.Snapshot, error) { + walPath := datadir.ToWALDir(dataDir) + walSnaps, err := wal.ValidSnapshotEntries(lg, walPath) + if err != nil { + return walpb.Snapshot{}, err + } + + ss := snap.New(lg, datadir.ToSnapDir(dataDir)) + snapshot, err := ss.LoadNewestAvailable(walSnaps) + if err != nil && !errors.Is(err, snap.ErrNoSnapshot) { + return walpb.Snapshot{}, err + } + + var walsnap walpb.Snapshot + if snapshot != nil { + walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term + } + return walsnap, nil +} + type migrateConfig struct { lg *zap.Logger be backend.Backend diff --git a/etcdutl/etcdutl/migrate_command_test.go b/etcdutl/etcdutl/migrate_command_test.go new file mode 100644 index 00000000000..e2acedfa2e7 --- /dev/null +++ b/etcdutl/etcdutl/migrate_command_test.go @@ -0,0 +1,143 @@ +// Copyright 2025 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdutl + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/client/pkg/v3/fileutil" + "go.etcd.io/etcd/pkg/v3/pbutil" + "go.etcd.io/etcd/server/v3/etcdserver/api/snap" + "go.etcd.io/etcd/server/v3/storage/datadir" + "go.etcd.io/etcd/server/v3/storage/wal" + "go.etcd.io/etcd/server/v3/storage/wal/walpb" + "go.etcd.io/raft/v3/raftpb" +) + +func TestGetLatestWalSnap(t *testing.T) { + testCases := []struct { + name string + walSnaps []walpb.Snapshot + snapshots []raftpb.Snapshot + expectedLatestWALSnap walpb.Snapshot + }{ + { + name: "wal snapshot records match the snapshot files", + walSnaps: []walpb.Snapshot{ + {Index: 10, Term: 2}, + {Index: 20, Term: 3}, + {Index: 30, Term: 5}, + }, + snapshots: []raftpb.Snapshot{ + {Metadata: raftpb.SnapshotMetadata{Index: 10, Term: 2}}, + {Metadata: raftpb.SnapshotMetadata{Index: 20, Term: 3}}, + {Metadata: raftpb.SnapshotMetadata{Index: 30, Term: 5}}, + }, + expectedLatestWALSnap: walpb.Snapshot{Index: 30, Term: 5}, + }, + { + name: "there are orphan snapshot files", + walSnaps: []walpb.Snapshot{ + {Index: 10, Term: 2}, + {Index: 20, Term: 3}, + {Index: 35, Term: 5}, + }, + snapshots: []raftpb.Snapshot{ + {Metadata: raftpb.SnapshotMetadata{Index: 10, Term: 2}}, + {Metadata: raftpb.SnapshotMetadata{Index: 20, Term: 3}}, + {Metadata: raftpb.SnapshotMetadata{Index: 35, Term: 5}}, + {Metadata: raftpb.SnapshotMetadata{Index: 40, Term: 6}}, + {Metadata: raftpb.SnapshotMetadata{Index: 50, Term: 7}}, + }, + expectedLatestWALSnap: walpb.Snapshot{Index: 35, Term: 5}, + }, + { + name: "there are orphan snapshot records in wal file", + walSnaps: []walpb.Snapshot{ + {Index: 10, Term: 2}, + {Index: 20, Term: 3}, + {Index: 30, Term: 4}, + {Index: 45, Term: 5}, + {Index: 55, Term: 6}, + }, + snapshots: []raftpb.Snapshot{ + {Metadata: raftpb.SnapshotMetadata{Index: 10, Term: 2}}, + {Metadata: raftpb.SnapshotMetadata{Index: 20, Term: 3}}, + {Metadata: raftpb.SnapshotMetadata{Index: 30, Term: 4}}, + }, + expectedLatestWALSnap: walpb.Snapshot{Index: 30, Term: 4}, + }, + { + name: "wal snapshot records do not match the snapshot files at all", + walSnaps: []walpb.Snapshot{ + {Index: 10, Term: 2}, + {Index: 20, Term: 3}, + {Index: 30, Term: 4}, + }, + snapshots: []raftpb.Snapshot{ + {Metadata: raftpb.SnapshotMetadata{Index: 40, Term: 5}}, + {Metadata: raftpb.SnapshotMetadata{Index: 50, Term: 6}}, + }, + expectedLatestWALSnap: walpb.Snapshot{}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + dataDir := t.TempDir() + lg := zap.NewNop() + + require.NoError(t, fileutil.TouchDirAll(lg, datadir.ToMemberDir(dataDir))) + require.NoError(t, fileutil.TouchDirAll(lg, datadir.ToWALDir(dataDir))) + require.NoError(t, fileutil.TouchDirAll(lg, datadir.ToSnapDir(dataDir))) + + // populate wal file + w, err := wal.Create(lg, datadir.ToWALDir(dataDir), pbutil.MustMarshal( + &etcdserverpb.Metadata{ + NodeID: 1, + ClusterID: 2, + }, + )) + require.NoError(t, err) + + for _, walSnap := range tc.walSnaps { + walSnap.ConfState = &raftpb.ConfState{Voters: []uint64{1}} + walErr := w.SaveSnapshot(walSnap) + require.NoError(t, walErr) + walErr = w.Save(raftpb.HardState{Term: walSnap.Term, Commit: walSnap.Index, Vote: 1}, nil) + require.NoError(t, walErr) + } + err = w.Close() + require.NoError(t, err) + + // generate snapshot files + ss := snap.New(lg, datadir.ToSnapDir(dataDir)) + for _, snap := range tc.snapshots { + snap.Metadata.ConfState = raftpb.ConfState{Voters: []uint64{1}} + snapErr := ss.SaveSnap(snap) + require.NoError(t, snapErr) + } + + walSnap, err := getLatestWALSnap(lg, dataDir) + require.NoError(t, err) + + require.Equal(t, tc.expectedLatestWALSnap, walSnap) + }) + } +}