diff --git a/v4/events/natsjs/nats.go b/v4/events/natsjs/nats.go index a521aefd..f8b64c94 100644 --- a/v4/events/natsjs/nats.go +++ b/v4/events/natsjs/nats.go @@ -112,6 +112,16 @@ func (s *stream) Publish(topic string, msg interface{}, opts ...events.PublishOp } // publish the event to the topic's channel + // publish synchronously if configured + if s.opts.SyncPublish { + _, err := s.natsJetStreamCtx.Publish(event.Topic, bytes) + if err != nil { + err = errors.Wrap(err, "Error publishing message to topic") + } + return err + } + + // publish asynchronously by default if _, err := s.natsJetStreamCtx.PublishAsync(event.Topic, bytes); err != nil { return errors.Wrap(err, "Error publishing message to topic") } diff --git a/v4/events/natsjs/options.go b/v4/events/natsjs/options.go index 59ab8fea..fdecd87a 100644 --- a/v4/events/natsjs/options.go +++ b/v4/events/natsjs/options.go @@ -8,11 +8,12 @@ import ( // Options which are used to configure the nats stream. type Options struct { - ClusterID string - ClientID string - Address string - TLSConfig *tls.Config - Logger logger.Logger + ClusterID string + ClientID string + Address string + TLSConfig *tls.Config + Logger logger.Logger + SyncPublish bool } // Option is a function which configures options. @@ -52,3 +53,10 @@ func Logger(log logger.Logger) Option { o.Logger = log } } + +// SynchronousPublish allows using a synchronous publishing instead of the default asynchronous +func SynchronousPublish(sync bool) Option { + return func(o *Options) { + o.SyncPublish = sync + } +}