From 065a273cc330018bcf0d2c3a52a557104e80efd5 Mon Sep 17 00:00:00 2001 From: Willy Kloucek Date: Thu, 17 Feb 2022 16:26:37 +0100 Subject: [PATCH] check running nats server for errors --- nats/pkg/command/server.go | 118 ++++++++++++++++---------------- nats/pkg/logging/nats.go | 52 ++++++++++++++ nats/pkg/server/nats/nats.go | 17 +++++ nats/pkg/server/nats/options.go | 37 ++++++++++ 4 files changed, 166 insertions(+), 58 deletions(-) create mode 100644 nats/pkg/logging/nats.go create mode 100644 nats/pkg/server/nats/nats.go create mode 100644 nats/pkg/server/nats/options.go diff --git a/nats/pkg/command/server.go b/nats/pkg/command/server.go index baf6bf5f8d7..da6884979e7 100644 --- a/nats/pkg/command/server.go +++ b/nats/pkg/command/server.go @@ -1,16 +1,15 @@ package command import ( + "context" "fmt" - "os" - "os/signal" - "syscall" + "time" - "github.com/cs3org/reva/pkg/events/server" + "github.com/oklog/run" "github.com/owncloud/ocis/nats/pkg/config" "github.com/owncloud/ocis/nats/pkg/config/parser" "github.com/owncloud/ocis/nats/pkg/logging" - "github.com/owncloud/ocis/ocis-pkg/log" + "github.com/owncloud/ocis/nats/pkg/server/nats" "github.com/urfave/cli/v2" // TODO: .Logger Option on events/server would make this import redundant @@ -28,65 +27,68 @@ func Server(cfg *config.Config) *cli.Command { }, Action: func(c *cli.Context) error { logger := logging.Configure(cfg.Service.Name, cfg.Log) - ch := make(chan os.Signal, 1) - signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) - err := server.RunNatsServer(server.Host(cfg.Nats.Host), server.Port(cfg.Nats.Port), server.StanOpts(func(o *stanServer.Options) { - o.CustomLogger = &logWrapper{logger} - })) - if err != nil { - return err - } - for { - select { - case <-ch: - // TODO: Should we shut down the NatsServer in a proper way here? - // That would require a reference to the StanServer instance for being able to call - // StanServer.Shutdown() github.com/cs3org/reva/pkg/events/server doesn't provide that - // currently - return nil + + gr := run.Group{} + ctx, cancel := func() (context.Context, context.CancelFunc) { + if cfg.Context == nil { + return context.WithCancel(context.Background()) } - } - }, - } -} + return context.WithCancel(cfg.Context) + }() -// we need to wrap our logger so we can pass it to the nats server -type logWrapper struct { - logger log.Logger -} + defer cancel() -// Noticef logs a notice statement -func (l *logWrapper) Noticef(format string, v ...interface{}) { - msg := fmt.Sprintf(format, v...) - l.logger.Info().Msg(msg) -} + var natsServer *stanServer.StanServer -// Warnf logs a warning statement -func (l *logWrapper) Warnf(format string, v ...interface{}) { - msg := fmt.Sprintf(format, v...) - l.logger.Warn().Msg(msg) -} + gr.Add(func() error { + var err error -// Fatalf logs a fatal statement -func (l *logWrapper) Fatalf(format string, v ...interface{}) { - msg := fmt.Sprintf(format, v...) - l.logger.Fatal().Msg(msg) -} + natsServer, err = nats.RunNatsServer( + nats.Host(cfg.Nats.Host), + nats.Port(cfg.Nats.Port), + nats.StanOpts( + func(o *stanServer.Options) { + o.CustomLogger = logging.NewLogWrapper(logger) + }, + ), + ) -// Errorf logs an error statement -func (l *logWrapper) Errorf(format string, v ...interface{}) { - msg := fmt.Sprintf(format, v...) - l.logger.Error().Msg(msg) -} + if err != nil { + return err + } -// Debugf logs a debug statement -func (l *logWrapper) Debugf(format string, v ...interface{}) { - msg := fmt.Sprintf(format, v...) - l.logger.Debug().Msg(msg) -} + errChan := make(chan error) + + go func() { + for { + // check if NATs server has an encountered an error + if err := natsServer.LastError(); err != nil { + errChan <- err + return + } + if ctx.Err() != nil { + return // context closed + } + time.Sleep(1 * time.Second) + } + }() -// Tracef logs a trace statement -func (l *logWrapper) Tracef(format string, v ...interface{}) { - msg := fmt.Sprintf(format, v...) - l.logger.Trace().Msg(msg) + select { + case <-ctx.Done(): + return nil + case err = <-errChan: + return err + } + + }, func(_ error) { + logger.Info(). + Msg("Shutting down server") + + natsServer.Shutdown() + cancel() + }) + + return gr.Run() + }, + } } diff --git a/nats/pkg/logging/nats.go b/nats/pkg/logging/nats.go new file mode 100644 index 00000000000..8074de3d602 --- /dev/null +++ b/nats/pkg/logging/nats.go @@ -0,0 +1,52 @@ +package logging + +import ( + "fmt" + + "github.com/owncloud/ocis/ocis-pkg/log" +) + +func NewLogWrapper(logger log.Logger) *LogWrapper { + return &LogWrapper{logger} +} + +// we need to wrap our logger so we can pass it to the nats server +type LogWrapper struct { + logger log.Logger +} + +// Noticef logs a notice statement +func (l *LogWrapper) Noticef(format string, v ...interface{}) { + msg := fmt.Sprintf(format, v...) + l.logger.Info().Msg(msg) +} + +// Warnf logs a warning statement +func (l *LogWrapper) Warnf(format string, v ...interface{}) { + msg := fmt.Sprintf(format, v...) + l.logger.Warn().Msg(msg) +} + +// Fatalf logs a fatal statement +func (l *LogWrapper) Fatalf(format string, v ...interface{}) { + msg := fmt.Sprintf(format, v...) + l.logger.Fatal().Msg(msg) +} + +// Errorf logs an error statement +func (l *LogWrapper) Errorf(format string, v ...interface{}) { + msg := fmt.Sprintf(format, v...) + l.logger.Error().Msg(msg) +} + +// Debugf logs a debug statement +func (l *LogWrapper) Debugf(format string, v ...interface{}) { + msg := fmt.Sprintf(format, v...) + l.logger.Debug().Msg(msg) +} + +// Tracef logs a trace statement +func (l *LogWrapper) Tracef(format string, v ...interface{}) { + msg := fmt.Sprintf(format, v...) + l.logger.Trace().Msg(msg) +} diff --git a/nats/pkg/server/nats/nats.go b/nats/pkg/server/nats/nats.go new file mode 100644 index 00000000000..398a4c96669 --- /dev/null +++ b/nats/pkg/server/nats/nats.go @@ -0,0 +1,17 @@ +package nats + +import ( + stanServer "github.com/nats-io/nats-streaming-server/server" +) + +// RunNatsServer runs the nats streaming server +func RunNatsServer(opts ...Option) (*stanServer.StanServer, error) { + natsOpts := stanServer.DefaultNatsServerOptions + stanOpts := stanServer.GetDefaultOptions() + + for _, o := range opts { + o(&natsOpts, stanOpts) + } + s, err := stanServer.RunServerWithOpts(stanOpts, &natsOpts) + return s, err +} diff --git a/nats/pkg/server/nats/options.go b/nats/pkg/server/nats/options.go new file mode 100644 index 00000000000..74e54e51f1f --- /dev/null +++ b/nats/pkg/server/nats/options.go @@ -0,0 +1,37 @@ +package nats + +import ( + natsServer "github.com/nats-io/nats-server/v2/server" + stanServer "github.com/nats-io/nats-streaming-server/server" +) + +// Option configures the nats server +type Option func(*natsServer.Options, *stanServer.Options) + +// Host sets the host URL for the nats server +func Host(url string) Option { + return func(no *natsServer.Options, _ *stanServer.Options) { + no.Host = url + } +} + +// Port sets the host URL for the nats server +func Port(port int) Option { + return func(no *natsServer.Options, _ *stanServer.Options) { + no.Port = port + } +} + +// NatsOpts allows setting Options from nats package directly +func NatsOpts(opt func(*natsServer.Options)) Option { + return func(no *natsServer.Options, _ *stanServer.Options) { + opt(no) + } +} + +// StanOpts allows setting Options from stan package directly +func StanOpts(opt func(*stanServer.Options)) Option { + return func(_ *natsServer.Options, so *stanServer.Options) { + opt(so) + } +}