Skip to content

Commit

Permalink
*: cancel required leader streams when memeber lost its leader
Browse files Browse the repository at this point in the history
  • Loading branch information
xiang90 committed May 13, 2016
1 parent 2e01105 commit 9c103dd
Show file tree
Hide file tree
Showing 9 changed files with 217 additions and 44 deletions.
2 changes: 1 addition & 1 deletion etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server {
opts = append(opts, grpc.Creds(credentials.NewTLS(tls)))
}
opts = append(opts, grpc.UnaryInterceptor(newUnaryInterceptor(s)))
opts = append(opts, grpc.StreamInterceptor(metricsStreamInterceptor))
opts = append(opts, grpc.StreamInterceptor(newStreamInterceptor(s)))

grpcServer := grpc.NewServer(opts...)
pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
Expand Down
90 changes: 90 additions & 0 deletions etcdserver/api/v3rpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package v3rpc

import (
"strings"
"sync"
"time"

"github.com/coreos/etcd/etcdserver"
Expand All @@ -28,6 +29,15 @@ import (
"google.golang.org/grpc/metadata"
)

const (
maxNoLeaderCnt = 3
)

type streamsMap struct {
mu sync.Mutex
streams map[grpc.ServerStream]struct{}
}

func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
md, ok := metadata.FromContext(ctx)
Expand All @@ -42,6 +52,37 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
}
}

func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor {
smap := monitorLeader(s)

return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, ok := metadata.FromContext(ss.Context())
if ok {
if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
if s.Leader() == types.ID(raft.None) {
return rpctypes.ErrGRPCNoLeader
}

cctx, cancel := context.WithCancel(ss.Context())
ss = serverStreamWithCtx{ctx: cctx, cancel: &cancel, ServerStream: ss}

smap.mu.Lock()
smap.streams[ss] = struct{}{}
smap.mu.Unlock()

defer func() {
smap.mu.Lock()
delete(smap.streams, ss)
smap.mu.Unlock()
cancel()
}()

}
}
return metricsStreamInterceptor(srv, ss, info, handler)
}
}

func metricsUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
service, method := splitMethodName(info.FullMethod)
receivedCounter.WithLabelValues(service, method).Inc()
Expand Down Expand Up @@ -75,3 +116,52 @@ func splitMethodName(fullMethodName string) (string, string) {
}
return "unknown", "unknown"
}

type serverStreamWithCtx struct {
grpc.ServerStream
ctx context.Context
cancel *context.CancelFunc
}

func (ssc serverStreamWithCtx) Context() context.Context { return ssc.ctx }

func monitorLeader(s *etcdserver.EtcdServer) *streamsMap {
smap := &streamsMap{
streams: make(map[grpc.ServerStream]struct{}),
}

go func() {
election := time.Duration(s.Cfg.TickMs) * time.Duration(s.Cfg.ElectionTicks) * time.Millisecond
noLeaderCnt := 0

for {
select {
case <-s.StopNotify():
return
case <-time.After(election):
if s.Leader() == types.ID(raft.None) {
noLeaderCnt++
} else {
noLeaderCnt = 0
}

// We are more conservative on canceling existing streams. Reconnecting streams
// cost much more than just rejecting new requests. So we wait until the member
// cannot find a leader for maxNoLeaderCnt election timeouts to cancel existing streams.
if noLeaderCnt >= maxNoLeaderCnt {
smap.mu.Lock()
for ss := range smap.streams {
if ssWithCtx, ok := ss.(serverStreamWithCtx); ok {
(*ssWithCtx.cancel)()
<-ss.Context().Done()
}
}
smap.streams = make(map[grpc.ServerStream]struct{})
smap.mu.Unlock()
}
}
}
}()

return smap
}
21 changes: 19 additions & 2 deletions etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ import (
"sync"
"time"

"golang.org/x/net/context"

"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/mvcc/mvccpb"
Expand Down Expand Up @@ -105,10 +108,24 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
progress: make(map[mvcc.WatchID]bool),
closec: make(chan struct{}),
}
defer sws.close()

go sws.sendLoop()
return sws.recvLoop()
errc := make(chan error, 1)
go func() {
errc <- sws.recvLoop()
sws.close()
}()
select {
case err := <-errc:
return err
case <-stream.Context().Done():
err := stream.Context().Err()
// the only server-side cancellation is noleader for now.
if err == context.Canceled {
return rpctypes.ErrGRPCNoLeader
}
return err
}
}

func (sws *serverWatchStream) recvLoop() error {
Expand Down
10 changes: 5 additions & 5 deletions etcdserver/quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,20 @@ const (
)

func NewBackendQuota(s *EtcdServer) Quota {
if s.cfg.QuotaBackendBytes < 0 {
if s.Cfg.QuotaBackendBytes < 0 {
// disable quotas if negative
plog.Warningf("disabling backend quota")
return &passthroughQuota{}
}
if s.cfg.QuotaBackendBytes == 0 {
if s.Cfg.QuotaBackendBytes == 0 {
// use default size if no quota size given
return &backendQuota{s, backend.DefaultQuotaBytes}
}
if s.cfg.QuotaBackendBytes > backend.MaxQuotaBytes {
plog.Warningf("backend quota %v exceeds maximum quota %v; using maximum", s.cfg.QuotaBackendBytes, backend.MaxQuotaBytes)
if s.Cfg.QuotaBackendBytes > backend.MaxQuotaBytes {
plog.Warningf("backend quota %v exceeds maximum quota %v; using maximum", s.Cfg.QuotaBackendBytes, backend.MaxQuotaBytes)
return &backendQuota{s, backend.MaxQuotaBytes}
}
return &backendQuota{s, s.cfg.QuotaBackendBytes}
return &backendQuota{s, s.Cfg.QuotaBackendBytes}
}

func (b *backendQuota) Available(v interface{}) bool {
Expand Down
8 changes: 4 additions & 4 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ func (r *raftNode) start(s *EtcdServer) {
r.done = make(chan struct{})

heartbeat := 200 * time.Millisecond
if s.cfg != nil {
heartbeat = time.Duration(s.cfg.TickMs) * time.Millisecond
if s.Cfg != nil {
heartbeat = time.Duration(s.Cfg.TickMs) * time.Millisecond
}
// set up contention detectors for raft heartbeat message.
// expect to send a heartbeat within 2 heartbeat intervals.
Expand Down Expand Up @@ -173,7 +173,7 @@ func (r *raftNode) start(s *EtcdServer) {
// it promotes or demotes instead of modifying server directly.
syncC = r.s.SyncTicker
if r.s.lessor != nil {
r.s.lessor.Promote(r.s.cfg.electionTimeout())
r.s.lessor.Promote(r.s.Cfg.electionTimeout())
}
// TODO: remove the nil checking
// current test utility does not provide the stats
Expand Down Expand Up @@ -238,7 +238,7 @@ func (r *raftNode) start(s *EtcdServer) {
raftDone <- struct{}{}
r.Advance()
case <-syncC:
r.s.sync(r.s.cfg.ReqTimeout())
r.s.sync(r.s.Cfg.ReqTimeout())
case <-r.stopped:
return
}
Expand Down
28 changes: 14 additions & 14 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,11 @@ type EtcdServer struct {
// count the number of inflight snapshots.
// MUST use atomic operation to access this field.
inflightSnapshots int64
Cfg *ServerConfig

readych chan struct{}
r raftNode

cfg *ServerConfig
snapCount uint64

w wait.Wait
Expand Down Expand Up @@ -369,7 +369,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {

srv = &EtcdServer{
readych: make(chan struct{}),
cfg: cfg,
Cfg: cfg,
snapCount: cfg.SnapCount,
errorc: make(chan error, 1),
store: st,
Expand Down Expand Up @@ -444,7 +444,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
// It also starts a goroutine to publish its server information.
func (s *EtcdServer) Start() {
s.start()
go s.publish(s.cfg.ReqTimeout())
go s.publish(s.Cfg.ReqTimeout())
go s.purgeFile()
go monitorFileDescriptor(s.done)
go s.monitorVersions()
Expand Down Expand Up @@ -473,11 +473,11 @@ func (s *EtcdServer) start() {

func (s *EtcdServer) purgeFile() {
var serrc, werrc <-chan error
if s.cfg.MaxSnapFiles > 0 {
serrc = fileutil.PurgeFile(s.cfg.SnapDir(), "snap", s.cfg.MaxSnapFiles, purgeFileInterval, s.done)
if s.Cfg.MaxSnapFiles > 0 {
serrc = fileutil.PurgeFile(s.Cfg.SnapDir(), "snap", s.Cfg.MaxSnapFiles, purgeFileInterval, s.done)
}
if s.cfg.MaxWALFiles > 0 {
werrc = fileutil.PurgeFile(s.cfg.WALDir(), "wal", s.cfg.MaxWALFiles, purgeFileInterval, s.done)
if s.Cfg.MaxWALFiles > 0 {
werrc = fileutil.PurgeFile(s.Cfg.WALDir(), "wal", s.Cfg.MaxWALFiles, purgeFileInterval, s.done)
}
select {
case e := <-werrc:
Expand Down Expand Up @@ -623,7 +623,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
plog.Panicf("get database snapshot file path error: %v", err)
}

fn := path.Join(s.cfg.SnapDir(), databaseFilename)
fn := path.Join(s.Cfg.SnapDir(), databaseFilename)
if err := os.Rename(snapfn, fn); err != nil {
plog.Panicf("rename snapshot file error: %v", err)
}
Expand Down Expand Up @@ -764,7 +764,7 @@ func (s *EtcdServer) LeaderStats() []byte {
func (s *EtcdServer) StoreStats() []byte { return s.store.JsonStats() }

func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) error {
if s.cfg.StrictReconfigCheck && !s.cluster.IsReadyToAddNewMember() {
if s.Cfg.StrictReconfigCheck && !s.cluster.IsReadyToAddNewMember() {
// If s.cfg.StrictReconfigCheck is false, it means the option --strict-reconfig-check isn't passed to etcd.
// In such a case adding a new member is allowed unconditionally
return ErrNotEnoughStartedMembers
Expand All @@ -784,7 +784,7 @@ func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) erro
}

func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) error {
if s.cfg.StrictReconfigCheck && !s.cluster.IsReadyToRemoveMember(id) {
if s.Cfg.StrictReconfigCheck && !s.cluster.IsReadyToRemoveMember(id) {
// If s.cfg.StrictReconfigCheck is false, it means the option --strict-reconfig-check isn't passed to etcd.
// In such a case removing a member is allowed unconditionally
return ErrNotEnoughStartedMembers
Expand Down Expand Up @@ -823,7 +823,7 @@ func (s *EtcdServer) Lead() uint64 { return atomic.LoadUint64(&s.r.lead) }

func (s *EtcdServer) Leader() types.ID { return types.ID(s.Lead()) }

func (s *EtcdServer) IsPprofEnabled() bool { return s.cfg.EnablePprof }
func (s *EtcdServer) IsPprofEnabled() bool { return s.Cfg.EnablePprof }

// configure sends a configuration change through consensus and
// then waits for it to be applied to the server. It
Expand Down Expand Up @@ -939,7 +939,7 @@ func (s *EtcdServer) send(ms []raftpb.Message) {
ok, exceed := s.r.td.Observe(ms[i].To)
if !ok {
// TODO: limit request rate.
plog.Warningf("failed to send out heartbeat on time (exceeded the %dms timeout for %v)", s.cfg.TickMs, exceed)
plog.Warningf("failed to send out heartbeat on time (exceeded the %dms timeout for %v)", s.Cfg.TickMs, exceed)
plog.Warningf("server is likely overloaded")
}
}
Expand Down Expand Up @@ -1221,7 +1221,7 @@ func (s *EtcdServer) updateClusterVersion(ver string) {
Path: membership.StoreClusterVersionKey(),
Val: ver,
}
ctx, cancel := context.WithTimeout(context.Background(), s.cfg.ReqTimeout())
ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
_, err := s.Do(ctx, req)
cancel()
switch err {
Expand All @@ -1241,7 +1241,7 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
return ErrCanceled
case context.DeadlineExceeded:
curLeadElected := s.r.leadElectedTime()
prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.cfg.ElectionTicks) * time.Duration(s.cfg.TickMs) * time.Millisecond)
prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond)
if start.After(prevLeadLost) && start.Before(curLeadElected) {
return ErrTimeoutDueToLeaderFail
}
Expand Down
Loading

0 comments on commit 9c103dd

Please sign in to comment.