diff --git a/v4/events/natsjs/nats.go b/v4/events/natsjs/nats.go index 0b4a984b..f46c7a4d 100644 --- a/v4/events/natsjs/nats.go +++ b/v4/events/natsjs/nats.go @@ -203,8 +203,10 @@ func (s *stream) Consume(topic string, opts ...events.ConsumeOption) (<-chan eve } // setup the options - subOpts := []nats.SubOpt{ - nats.Durable(options.Group), + subOpts := []nats.SubOpt{} + + if !s.opts.DisableDurableStreams { + subOpts = append(subOpts, nats.Durable(options.Group)) } if options.CustomRetries { diff --git a/v4/events/natsjs/options.go b/v4/events/natsjs/options.go index 8418628a..05f7b443 100644 --- a/v4/events/natsjs/options.go +++ b/v4/events/natsjs/options.go @@ -8,14 +8,15 @@ import ( // Options which are used to configure the nats stream. type Options struct { - ClusterID string - ClientID string - Address string - NkeyConfig string - TLSConfig *tls.Config - Logger logger.Logger - SyncPublish bool - Name string + ClusterID string + ClientID string + Address string + NkeyConfig string + TLSConfig *tls.Config + Logger logger.Logger + SyncPublish bool + Name string + DisableDurableStreams bool } // Option is a function which configures options. @@ -76,3 +77,10 @@ func Name(name string) Option { o.Name = name } } + +// DisableDurableStreams will disable durable streams +func DisableDurableStreams() Option { + return func(o *Options) { + o.DisableDurableStreams = true + } +}