Skip to content

Commit

Permalink
*: fetch peers from disk to check data corruption
Browse files Browse the repository at this point in the history
Signed-off-by: Gyu-Ho Lee <[email protected]>
  • Loading branch information
gyuho committed Nov 22, 2017
1 parent 5981c45 commit 4ab97dd
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 13 deletions.
2 changes: 2 additions & 0 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {

if needCorruptCheck {
if err = e.Server.CheckInitialHashKV(); err != nil {
// since this is before EtcdServer starts
e.Server = nil
return e, err
}
}
Expand Down
6 changes: 3 additions & 3 deletions etcdserver/corrupt.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,11 @@ func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*peerHashKVResp) {
pURLs = append(pURLs, m.PeerURLs)
}
if len(pURLs) == 0 {
for k, v := range s.Cfg.InitialPeerURLsMap {
if k == s.Cfg.Name {
for id, ps := range s.idToPeerURLs {
if id == s.ID() {
continue
}
pURLs = append(pURLs, v.StringSlice())
pURLs = append(pURLs, ps)
}
}

Expand Down
24 changes: 17 additions & 7 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id
return id, n, s, w
}

func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL, map[types.ID][]string) {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
Expand All @@ -444,16 +444,17 @@ func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *member
MaxInflightMsgs: maxInflightMsgs,
CheckQuorum: true,
}
_, idToPeerURLs := getIDs(snapshot, ents)

n := raft.RestartNode(c)
raftStatusMu.Lock()
raftStatus = n.Status
raftStatusMu.Unlock()
advanceTicksForElection(n, c.ElectionTick)
return id, cl, n, s, w
return id, cl, n, s, w, idToPeerURLs
}

func restartAsStandaloneNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
func restartAsStandaloneNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL, map[types.ID][]string) {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
Expand All @@ -470,7 +471,8 @@ func restartAsStandaloneNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types
}

// force append the configuration change entries
toAppEnts := createConfigChangeEnts(getIDs(snapshot, ents), uint64(id), st.Term, st.Commit)
ids, idToPeerURLs := getIDs(snapshot, ents)
toAppEnts := createConfigChangeEnts(ids, uint64(id), st.Term, st.Commit)
ents = append(ents, toAppEnts...)

// force commit newly appended entries
Expand Down Expand Up @@ -501,21 +503,22 @@ func restartAsStandaloneNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types
}
n := raft.RestartNode(c)
raftStatus = n.Status
return id, cl, n, s, w
return id, cl, n, s, w, idToPeerURLs
}

// getIDs returns an ordered set of IDs included in the given snapshot and
// the entries. The given snapshot/entries can contain two kinds of
// ID-related entry:
// - ConfChangeAddNode, in which case the contained ID will be added into the set.
// - ConfChangeRemoveNode, in which case the contained ID will be removed from the set.
func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) ([]uint64, map[types.ID][]string) {
ids := make(map[uint64]bool)
if snap != nil {
for _, id := range snap.Metadata.ConfState.Nodes {
ids[id] = true
}
}
idToPeerURLs := make(map[types.ID][]string)
for _, e := range ents {
if e.Type != raftpb.EntryConfChange {
continue
Expand All @@ -525,6 +528,13 @@ func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
switch cc.Type {
case raftpb.ConfChangeAddNode:
ids[cc.NodeID] = true
var mem membership.Member
if len(cc.Context) > 0 {
if err := json.Unmarshal(cc.Context, &mem); err != nil {
plog.Panicf("unmarshal member should never fail: %v", err)
}
}
idToPeerURLs[types.ID(cc.NodeID)] = mem.PeerURLs
case raftpb.ConfChangeRemoveNode:
delete(ids, cc.NodeID)
case raftpb.ConfChangeUpdateNode:
Expand All @@ -538,7 +548,7 @@ func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
sids = append(sids, id)
}
sort.Sort(sids)
return []uint64(sids)
return []uint64(sids), idToPeerURLs
}

// createConfigChangeEnts creates a series of Raft entries (i.e.
Expand Down
2 changes: 1 addition & 1 deletion etcdserver/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestGetIDs(t *testing.T) {
if tt.confState != nil {
snap.Metadata.ConfState = *tt.confState
}
idSet := getIDs(&snap, tt.ents)
idSet, _ := getIDs(&snap, tt.ents)
if !reflect.DeepEqual(idSet, tt.widSet) {
t.Errorf("#%d: idset = %#v, want %#v", i, idSet, tt.widSet)
}
Expand Down
10 changes: 8 additions & 2 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ type EtcdServer struct {

cluster *membership.RaftCluster

// only populated when the node has already been initialzed
// useful for fetching members before serving peer traffic
idToPeerURLs map[types.ID][]string

store store.Store
snapshotter *snap.Snapshotter

Expand Down Expand Up @@ -300,6 +304,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
snapshot *raftpb.Snapshot
)

idToPeerURLs := make(map[types.ID][]string)
switch {
case !haveWAL && !cfg.NewCluster:
if err = cfg.VerifyJoinExisting(); err != nil {
Expand Down Expand Up @@ -387,9 +392,9 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
}
cfg.Print()
if !cfg.ForceNewCluster {
id, cl, n, s, w = restartNode(cfg, snapshot)
id, cl, n, s, w, idToPeerURLs = restartNode(cfg, snapshot)
} else {
id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot)
id, cl, n, s, w, idToPeerURLs = restartAsStandaloneNode(cfg, snapshot)
}
cl.SetStore(st)
cl.SetBackend(be)
Expand Down Expand Up @@ -428,6 +433,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
id: id,
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
cluster: cl,
idToPeerURLs: idToPeerURLs,
stats: sstats,
lstats: lstats,
SyncTicker: time.NewTicker(500 * time.Millisecond),
Expand Down

0 comments on commit 4ab97dd

Please sign in to comment.