Skip to content

Commit

Permalink
fix(events/natsjs): allow synchronous publishing in Nats (#116)
Browse files Browse the repository at this point in the history
  • Loading branch information
kobergj authored Jul 6, 2023
1 parent a179a6b commit ea37907
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 5 deletions.
10 changes: 10 additions & 0 deletions v4/events/natsjs/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,16 @@ func (s *stream) Publish(topic string, msg interface{}, opts ...events.PublishOp
}

// publish the event to the topic's channel
// publish synchronously if configured
if s.opts.SyncPublish {
_, err := s.natsJetStreamCtx.Publish(event.Topic, bytes)
if err != nil {
err = errors.Wrap(err, "Error publishing message to topic")
}
return err
}

// publish asynchronously by default
if _, err := s.natsJetStreamCtx.PublishAsync(event.Topic, bytes); err != nil {
return errors.Wrap(err, "Error publishing message to topic")
}
Expand Down
18 changes: 13 additions & 5 deletions v4/events/natsjs/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import (

// Options which are used to configure the nats stream.
type Options struct {
ClusterID string
ClientID string
Address string
TLSConfig *tls.Config
Logger logger.Logger
ClusterID string
ClientID string
Address string
TLSConfig *tls.Config
Logger logger.Logger
SyncPublish bool
}

// Option is a function which configures options.
Expand Down Expand Up @@ -52,3 +53,10 @@ func Logger(log logger.Logger) Option {
o.Logger = log
}
}

// SynchronousPublish allows using a synchronous publishing instead of the default asynchronous
func SynchronousPublish(sync bool) Option {
return func(o *Options) {
o.SyncPublish = sync
}
}

0 comments on commit ea37907

Please sign in to comment.