Skip to content

Commit

Permalink
embed: serve basic v3 grpc over peer port
Browse files Browse the repository at this point in the history
  • Loading branch information
Anthony Romano committed Aug 19, 2017
1 parent b8b97c1 commit cbde61f
Showing 1 changed file with 48 additions and 29 deletions.
77 changes: 48 additions & 29 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package embed

import (
"context"
"crypto/tls"
"fmt"
"io/ioutil"
defaultLog "log"
Expand All @@ -28,13 +29,17 @@ import (
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/etcdhttp"
"github.com/coreos/etcd/etcdserver/api/v2http"
"github.com/coreos/etcd/etcdserver/api/v3rpc"
"github.com/coreos/etcd/pkg/cors"
"github.com/coreos/etcd/pkg/debugutil"
runtimeutil "github.com/coreos/etcd/pkg/runtime"
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/rafthttp"
"github.com/coreos/pkg/capnslog"

"github.com/cockroachdb/cmux"
"google.golang.org/grpc"
)

var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "embed")
Expand Down Expand Up @@ -152,29 +157,39 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
return
}

// buffer channel so goroutines on closed connections won't wait forever
e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))

e.Server.Start()

// configure peer handlers after rafthttp.Transport started
ph := etcdhttp.NewPeerHandler(e.Server)
var peerTLScfg *tls.Config
if !cfg.PeerTLSInfo.Empty() {
if peerTLScfg, err = cfg.PeerTLSInfo.ServerConfig(); err != nil {
return
}
}
for _, p := range e.Peers {
gs := v3rpc.Server(e.Server, peerTLScfg)
m := cmux.New(p.Listener)
go gs.Serve(m.Match(cmux.HTTP2()))
srv := &http.Server{
Handler: ph,
Handler: grpcHandlerFunc(gs, ph),
ReadTimeout: 5 * time.Minute,
ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error
}

l := p.Listener
p.serve = func() error { return srv.Serve(l) }
go srv.Serve(m.Match(cmux.Any()))
p.serve = func() error { return m.Serve() }
p.close = func(ctx context.Context) error {
// gracefully shutdown http.Server
// close open listeners, idle connections
// until context cancel or time-out
e.stopGRPCServer(gs)
return srv.Shutdown(ctx)
}
}

// buffer channel so goroutines on closed connections won't wait forever
e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))

e.Server.Start()
if err = e.serve(); err != nil {
return
}
Expand All @@ -190,29 +205,9 @@ func (e *Etcd) Config() Config {
func (e *Etcd) Close() {
e.closeOnce.Do(func() { close(e.stopc) })

timeout := 2 * time.Second
if e.Server != nil {
timeout = e.Server.Cfg.ReqTimeout()
}
for _, sctx := range e.sctxs {
for gs := range sctx.grpcServerC {
ch := make(chan struct{})
go func() {
defer close(ch)
// close listeners to stop accepting new connections,
// will block on any existing transports
gs.GracefulStop()
}()
// wait until all pending RPCs are finished
select {
case <-ch:
case <-time.After(timeout):
// took too long, manually close open transports
// e.g. watch streams
gs.Stop()
// concurrent GracefulStop should be interrupted
<-ch
}
e.stopGRPCServer(gs)
}
}

Expand Down Expand Up @@ -243,6 +238,30 @@ func (e *Etcd) Close() {
}
}

func (e *Etcd) stopGRPCServer(gs *grpc.Server) {
timeout := 2 * time.Second
if e.Server != nil {
timeout = e.Server.Cfg.ReqTimeout()
}
ch := make(chan struct{})
go func() {
defer close(ch)
// close listeners to stop accepting new connections,
// will block on any existing transports
gs.GracefulStop()
}()
// wait until all pending RPCs are finished
select {
case <-ch:
case <-time.After(timeout):
// took too long, manually close open transports
// e.g. watch streams
gs.Stop()
// concurrent GracefulStop should be interrupted
<-ch
}
}

func (e *Etcd) Err() <-chan error { return e.errc }

func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {
Expand Down

0 comments on commit cbde61f

Please sign in to comment.