Skip to content

Commit

Permalink
Propagate error to main
Browse files Browse the repository at this point in the history
Resolves racing condition when healthcheck's status is set to available
after it is set to unavailable by server initialization process

Signed-off-by: stefan vassilev <[email protected]>
  • Loading branch information
stefanvassilev committed Apr 26, 2019
1 parent 381a2c8 commit bd28b50
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 6 deletions.
17 changes: 14 additions & 3 deletions cmd/flags/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,23 @@ type Service struct {
MetricsFactory metrics.Factory

signalsChannel chan os.Signal

// HcStatusChannel is used for error propagation
HcStatusChannel chan healthcheck.Status
}

// NewService creates a new Service.
func NewService(adminPort int) *Service {
signalsChannel := make(chan os.Signal)
hcStatusChannel := make(chan healthcheck.Status)
signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)

grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))

return &Service{
Admin: NewAdminServer(adminPort),
signalsChannel: signalsChannel,
Admin: NewAdminServer(adminPort),
signalsChannel: signalsChannel,
HcStatusChannel: hcStatusChannel,
}
}

Expand Down Expand Up @@ -120,7 +125,13 @@ func (s *Service) HC() *healthcheck.HealthCheck {
// If then runs the shutdown function and exits.
func (s *Service) RunAndThen(shutdown func()) {
s.HC().Ready()
<-s.signalsChannel

select {
case status := <-s.HcStatusChannel:
s.HC().Set(status)
case <-s.signalsChannel:

}

s.Logger.Info("Shutting down")
s.HC().Set(healthcheck.Unavailable)
Expand Down
8 changes: 5 additions & 3 deletions cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,14 @@ func (s *Server) Start() {
h := createHandler(s.querySvc, s.svc.Logger, s.tracker)
api_v2.RegisterQueryServiceServer(s.grpcServer, h)

status := s.svc.HcStatusChannel

go func() {
s.svc.Logger.Info("Starting HTTP server", zap.Int("port", s.listenPort))
if err := s.httpServer.Serve(s.httpListener); err != nil {
s.svc.Logger.Error("Could not start HTTP server", zap.Error(err))
}
s.svc.HC().Set(healthcheck.Unavailable)
status <- healthcheck.Unavailable
}()

// Start GRPC server concurrently
Expand All @@ -106,7 +108,7 @@ func (s *Server) Start() {
if err := s.grpcServer.Serve(s.grpcListener); err != nil {
s.svc.Logger.Error("Could not start GRPC server", zap.Error(err))
}
s.svc.HC().Set(healthcheck.Unavailable)
status <- healthcheck.Unavailable
}()

// Start cmux server concurrently.
Expand All @@ -115,7 +117,7 @@ func (s *Server) Start() {
if err := s.muxServer.Serve(); err != nil {
s.svc.Logger.Error("Could not start multiplexed server", zap.Error(err))
}
s.svc.HC().Set(healthcheck.Unavailable)
status <- healthcheck.Unavailable
}()

}

0 comments on commit bd28b50

Please sign in to comment.