From 80c10e150f7d41b0c08818193ce70e4947a83096 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 15 Mar 2017 11:04:25 -0700 Subject: [PATCH] etcdserver: remove possibly compacted entry look-up Fix https://github.com/coreos/etcd/issues/7470. This patch removes unnecessary term look-up in 'createMergedSnapshotMessage', which can trigger panic if raft entry at etcdProgress.appliedi got compacted by subsequent 'MsgSnap' messages--if a follower is being (in this case, network latency spikes), it could receive subsequent 'MsgSnap' requests from leader. etcd server-side 'applyAll' routine and raft's Ready processing routine becomes asynchronous after raft entries are persisted. And given that raft Ready routine takes less time to finish, it is possible that second 'MsgSnap' is being handled, while the slow 'applyAll' is still processing the first(old) 'MsgSnap'. Then raft Ready routine can compact the log entries at future index to 'applyAll'. That is how 'createMergedSnapshotMessage' tried to look up raft term with outdated etcdProgress.appliedi. Signed-off-by: Gyu-Ho Lee --- etcdserver/server.go | 18 ++++++++++-------- etcdserver/server_test.go | 2 +- etcdserver/snapshot_merge.go | 8 +------- 3 files changed, 12 insertions(+), 16 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index 187d82e1392..1eb5bcad278 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -599,6 +599,7 @@ func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) { type etcdProgress struct { confState raftpb.ConfState snapi uint64 + appliedt uint64 appliedi uint64 } @@ -676,6 +677,7 @@ func (s *EtcdServer) run() { ep := etcdProgress{ confState: sn.Metadata.ConfState, snapi: sn.Metadata.Index, + appliedt: sn.Metadata.Term, appliedi: sn.Metadata.Index, } @@ -777,7 +779,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) { select { // snapshot requested via send() case m := <-s.r.msgSnapC: - merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState) + merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState) s.sendMergedSnap(merged) default: } @@ -879,6 +881,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { } plog.Info("finished adding peers from new cluster configuration into network...") + ep.appliedt = apply.snapshot.Metadata.Term ep.appliedi = apply.snapshot.Metadata.Index ep.snapi = ep.appliedi ep.confState = apply.snapshot.Metadata.ConfState @@ -900,7 +903,7 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) { return } var shouldstop bool - if ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop { + if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop { go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster")) } } @@ -1254,9 +1257,7 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) { // apply takes entries received from Raft (after it has been committed) and // applies them to the current state of the EtcdServer. // The given entries should not be empty. -func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint64, bool) { - var applied uint64 - var shouldstop bool +func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (appliedt uint64, appliedi uint64, shouldStop bool) { for i := range es { e := es[i] switch e.Type { @@ -1266,16 +1267,17 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint var cc raftpb.ConfChange pbutil.MustUnmarshal(&cc, e.Data) removedSelf, err := s.applyConfChange(cc, confState) - shouldstop = shouldstop || removedSelf + shouldStop = shouldStop || removedSelf s.w.Trigger(cc.ID, err) default: plog.Panicf("entry type should be either EntryNormal or EntryConfChange") } atomic.StoreUint64(&s.r.index, e.Index) atomic.StoreUint64(&s.r.term, e.Term) - applied = e.Index + appliedt = e.Term + appliedi = e.Index } - return applied, shouldstop + return appliedt, appliedi, shouldStop } // applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index af79517e0d4..57b1aebd858 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -615,7 +615,7 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) { ents = append(ents, ent) } - _, shouldStop := srv.apply(ents, &raftpb.ConfState{}) + _, _, shouldStop := srv.apply(ents, &raftpb.ConfState{}) if !shouldStop { t.Errorf("shouldStop = %t, want %t", shouldStop, true) } diff --git a/etcdserver/snapshot_merge.go b/etcdserver/snapshot_merge.go index 1de996c507f..9cfc852168b 100644 --- a/etcdserver/snapshot_merge.go +++ b/etcdserver/snapshot_merge.go @@ -16,7 +16,6 @@ package etcdserver import ( "io" - "log" "github.com/coreos/etcd/mvcc/backend" "github.com/coreos/etcd/raft/raftpb" @@ -26,12 +25,7 @@ import ( // createMergedSnapshotMessage creates a snapshot message that contains: raft status (term, conf), // a snapshot of v2 store inside raft.Snapshot as []byte, a snapshot of v3 KV in the top level message // as ReadCloser. -func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapi uint64, confState raftpb.ConfState) snap.Message { - snapt, err := s.r.raftStorage.Term(snapi) - if err != nil { - log.Panicf("get term should never fail: %v", err) - } - +func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) snap.Message { // get a snapshot of v2 store as []byte clone := s.store.Clone() d, err := clone.SaveNoCopy()