diff --git a/embed/etcd.go b/embed/etcd.go index e69adbfd63b4..7388cfd1365e 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -16,6 +16,7 @@ package embed import ( "context" + "crypto/tls" "fmt" "io/ioutil" defaultLog "log" @@ -28,6 +29,7 @@ import ( "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/api/etcdhttp" "github.com/coreos/etcd/etcdserver/api/v2http" + "github.com/coreos/etcd/etcdserver/api/v3rpc" "github.com/coreos/etcd/pkg/cors" "github.com/coreos/etcd/pkg/debugutil" runtimeutil "github.com/coreos/etcd/pkg/runtime" @@ -35,6 +37,9 @@ import ( "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/rafthttp" "github.com/coreos/pkg/capnslog" + + "github.com/cockroachdb/cmux" + "google.golang.org/grpc" ) var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "embed") @@ -152,29 +157,39 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { return } + // buffer channel so goroutines on closed connections won't wait forever + e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs)) + + e.Server.Start() + // configure peer handlers after rafthttp.Transport started ph := etcdhttp.NewPeerHandler(e.Server) + var peerTLScfg *tls.Config + if !cfg.PeerTLSInfo.Empty() { + if peerTLScfg, err = cfg.PeerTLSInfo.ServerConfig(); err != nil { + return + } + } for _, p := range e.Peers { + gs := v3rpc.Server(e.Server, peerTLScfg) + m := cmux.New(p.Listener) + go gs.Serve(m.Match(cmux.HTTP2())) srv := &http.Server{ - Handler: ph, + Handler: grpcHandlerFunc(gs, ph), ReadTimeout: 5 * time.Minute, ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error } - - l := p.Listener - p.serve = func() error { return srv.Serve(l) } + go srv.Serve(m.Match(cmux.Any())) + p.serve = func() error { return m.Serve() } p.close = func(ctx context.Context) error { // gracefully shutdown http.Server // close open listeners, idle connections // until context cancel or time-out + e.stopGRPCServer(gs) return srv.Shutdown(ctx) } } - // buffer channel so goroutines on closed connections won't wait forever - e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs)) - - e.Server.Start() if err = e.serve(); err != nil { return } @@ -190,29 +205,9 @@ func (e *Etcd) Config() Config { func (e *Etcd) Close() { e.closeOnce.Do(func() { close(e.stopc) }) - timeout := 2 * time.Second - if e.Server != nil { - timeout = e.Server.Cfg.ReqTimeout() - } for _, sctx := range e.sctxs { for gs := range sctx.grpcServerC { - ch := make(chan struct{}) - go func() { - defer close(ch) - // close listeners to stop accepting new connections, - // will block on any existing transports - gs.GracefulStop() - }() - // wait until all pending RPCs are finished - select { - case <-ch: - case <-time.After(timeout): - // took too long, manually close open transports - // e.g. watch streams - gs.Stop() - // concurrent GracefulStop should be interrupted - <-ch - } + e.stopGRPCServer(gs) } } @@ -243,6 +238,30 @@ func (e *Etcd) Close() { } } +func (e *Etcd) stopGRPCServer(gs *grpc.Server) { + timeout := 2 * time.Second + if e.Server != nil { + timeout = e.Server.Cfg.ReqTimeout() + } + ch := make(chan struct{}) + go func() { + defer close(ch) + // close listeners to stop accepting new connections, + // will block on any existing transports + gs.GracefulStop() + }() + // wait until all pending RPCs are finished + select { + case <-ch: + case <-time.After(timeout): + // took too long, manually close open transports + // e.g. watch streams + gs.Stop() + // concurrent GracefulStop should be interrupted + <-ch + } +} + func (e *Etcd) Err() <-chan error { return e.errc } func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {