Skip to content

Commit

Permalink
rpc(server): simplify connection closing
Browse files Browse the repository at this point in the history
  • Loading branch information
BrianHicks committed Dec 6, 2016
1 parent c131caa commit dec3d87
Showing 1 changed file with 20 additions and 42 deletions.
62 changes: 20 additions & 42 deletions rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ func (s *Server) newREST(ctx context.Context, addr *url.URL) (*http.Server, erro
func (s *Server) Listen(ctx context.Context, addr *url.URL) error {
logger := logging.GetLogger(ctx).WithField("addr", addr)

// set up a context within the waitgroup
wg, ctx := errgroup.WithContext(ctx)

// set up listeners
//
// We'll start with a regular net.Listener. This is going to be our entry
Expand All @@ -118,6 +121,14 @@ func (s *Server) Listen(ctx context.Context, addr *url.URL) error {
if err != nil {
return errors.Wrap(err, "failed to listen")
}
wg.Go(func() error {
logger.Debug("waiting to close listener")
<-ctx.Done()
logger.Info("closing listener")

return lis.Close()
})

if s.Security.UseSSL {
logger.Debug("wrapping insecure listener in secure listener")
lis, err = s.Security.WrapListener(lis)
Expand All @@ -128,41 +139,21 @@ func (s *Server) Listen(ctx context.Context, addr *url.URL) error {

mux := cmux.New(lis)

// set up a context for cancelling out of all of this
//
// we set up a sub-context here so we can cancel it on exit. It lets us tear
// down listeners in the errgroup more easily.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
wg, ctx := errgroup.WithContext(ctx)

// start the GRPC listener and server
//
// Each of the tasks in the workers must handle if the errors they received
// are any form of use-after-close error. This happens on shutdown for
// cleanup purposes. In most of these cases, receiving an error means we're
// already cleaned up so we just need to check which error it is.
grpcSrv, err := s.newGRPC()
if err != nil {
return errors.Wrap(err, "failed to create grpc server")
}
grpcLis := mux.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))

wg.Go(func() error {
logger.Debug("waiting to close GRPC listener")
<-ctx.Done()
logger.Info("closing GRPC listener")

err := grpcLis.Close()
if err != nil && !IsClosedNetworkConnErr(err) {
return errors.Wrap(err, "failed to close GRPC listener")
grpcSrv, err := s.newGRPC()
if err != nil {
return errors.Wrap(err, "failed to create grpc server")
}
return nil
})
lis := mux.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))

wg.Go(func() error {
logger.Info("serving GRPC")
err := grpcSrv.Serve(grpcLis)
err = grpcSrv.Serve(lis)
logger.Debug("finished serving GRPC")

if err != nil && err != cmux.ErrListenerClosed {
Expand All @@ -174,27 +165,14 @@ func (s *Server) Listen(ctx context.Context, addr *url.URL) error {
// start the REST gateway listener and server
//
// Same cancellation semantics as GRPC listeners.
restSrv, err := s.newREST(ctx, addr)
if err != nil {
return errors.Wrap(err, "failed to create REST server")
}
restLis := mux.Match(cmux.HTTP1())

wg.Go(func() error {
logger.Debug("waiting to close REST listener")
<-ctx.Done()
logger.Info("closing REST listener")

err := restLis.Close()
if err != nil && !IsClosedNetworkConnErr(err) {
return errors.Wrap(err, "failed to close REST listener")
restSrv, err := s.newREST(ctx, addr)
if err != nil {
return errors.Wrap(err, "failed to create REST server")
}
return nil
})

wg.Go(func() error {
logger.Info("serving REST")
err := restSrv.Serve(restLis)
err = restSrv.Serve(mux.Match(cmux.HTTP1()))
logger.Debug("finished serving REST")

if err != nil && err != cmux.ErrListenerClosed {
Expand Down

0 comments on commit dec3d87

Please sign in to comment.