diff --git a/examples/jetstream/js-consume/main.go b/examples/jetstream/js-consume/main.go index eab191c72..1922486fc 100644 --- a/examples/jetstream/js-consume/main.go +++ b/examples/jetstream/js-consume/main.go @@ -47,7 +47,7 @@ func main() { log.Fatal(err) } - cons, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{ + cons, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ Durable: "TestConsumerConsume", AckPolicy: jetstream.AckExplicitPolicy, }) diff --git a/examples/jetstream/js-fetch/main.go b/examples/jetstream/js-fetch/main.go index 7177821e8..d14ff902b 100644 --- a/examples/jetstream/js-fetch/main.go +++ b/examples/jetstream/js-fetch/main.go @@ -44,7 +44,7 @@ func main() { log.Fatal(err) } - cons, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{ + cons, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ Durable: "TestConsumerListener", AckPolicy: jetstream.AckExplicitPolicy, }) diff --git a/examples/jetstream/js-messages/main.go b/examples/jetstream/js-messages/main.go index be88ba552..893093e45 100644 --- a/examples/jetstream/js-messages/main.go +++ b/examples/jetstream/js-messages/main.go @@ -44,7 +44,7 @@ func main() { log.Fatal(err) } - cons, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{ + cons, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ Durable: "TestConsumerMessages", AckPolicy: jetstream.AckExplicitPolicy, }) diff --git a/examples/jetstream/js-next/main.go b/examples/jetstream/js-next/main.go index 649d8ea46..278cea757 100644 --- a/examples/jetstream/js-next/main.go +++ b/examples/jetstream/js-next/main.go @@ -44,7 +44,7 @@ func main() { log.Fatal(err) } - cons, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{ + cons, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ Durable: "TestConsumerListener", AckPolicy: jetstream.AckExplicitPolicy, }) diff --git a/examples/jetstream/js-parallel-consume/main.go b/examples/jetstream/js-parallel-consume/main.go index e5b6efd58..4d59b0a01 100644 --- a/examples/jetstream/js-parallel-consume/main.go +++ b/examples/jetstream/js-parallel-consume/main.go @@ -47,7 +47,7 @@ func main() { log.Fatal(err) } - cons, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{ + cons, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ Durable: "TestConsumerParallelConsume", AckPolicy: jetstream.AckExplicitPolicy, }) diff --git a/jetstream/README.md b/jetstream/README.md index 08f4ca8b4..81d7b27f6 100644 --- a/jetstream/README.md +++ b/jetstream/README.md @@ -92,7 +92,7 @@ func main() { } // Create durable consumer - c, _ := s.AddConsumer(ctx, jetstream.ConsumerConfig{ + c, _ := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ Durable: "CONS", AckPolicy: jetstream.AckExplicitPolicy, }) @@ -253,13 +253,13 @@ CRUD operations on consumers can be achieved on 2 levels: js, _ := jetstream.New(nc) // create a consumer (this is an idempotent operation) -cons, _ := js.AddConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{ +cons, _ := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{ Durable: "foo", AckPolicy: jetstream.AckExplicitPolicy, }) // create an ephemeral pull consumer by not providing `Durable` -ephemeral, _ := js.AddConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{ +ephemeral, _ := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{ AckPolicy: jetstream.AckExplicitPolicy, }) @@ -280,7 +280,7 @@ js, _ := jetstream.New(nc) stream, _ := js.Stream(ctx, "ORDERS") // create consumer -cons, _ := stream.AddConsumer(ctx, jetstream.ConsumerConfig{ +cons, _ := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ Durable: "foo", AckPolicy: jetstream.AckExplicitPolicy, }) @@ -422,7 +422,7 @@ value. ##### Using `Consume()` receive messages in a callback ```go -cons, _ := js.AddConsumer("ORDERS", jetstream.ConsumerConfig{ +cons, _ := js.CreateOrUpdateConsumer("ORDERS", jetstream.ConsumerConfig{ AckPolicy: jetstream.AckExplicitPolicy, // receive messages from ORDERS.A subject only FilterSubject: "ORDERS.A" @@ -557,7 +557,7 @@ case err := <-ackF.Err(): } // similarly to syncronous publish, there is a helper method accepting subject and data -ackF, err = js.PublishAsync(ctx, "ORDERS.new", []byte("hello")) +ackF, err = js.PublishAsync("ORDERS.new", []byte("hello")) ``` Just as for synchronous publish, `PublishAsync()` and `PublishMsgAsync()` accept diff --git a/jetstream/consumer_config.go b/jetstream/consumer_config.go index 750e83cf4..37c6e12f8 100644 --- a/jetstream/consumer_config.go +++ b/jetstream/consumer_config.go @@ -55,8 +55,6 @@ type ( SampleFrequency string `json:"sample_freq,omitempty"` MaxWaiting int `json:"max_waiting,omitempty"` MaxAckPending int `json:"max_ack_pending,omitempty"` - FlowControl bool `json:"flow_control,omitempty"` - Heartbeat time.Duration `json:"idle_heartbeat,omitempty"` HeadersOnly bool `json:"headers_only,omitempty"` // Pull based options. @@ -64,10 +62,6 @@ type ( MaxRequestExpires time.Duration `json:"max_expires,omitempty"` MaxRequestMaxBytes int `json:"max_bytes,omitempty"` - // Push based consumers. - DeliverSubject string `json:"deliver_subject,omitempty"` - DeliverGroup string `json:"deliver_group,omitempty"` - // Inactivity threshold. InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"` diff --git a/jetstream/jetstream.go b/jetstream/jetstream.go index 563699ac2..a26b12e26 100644 --- a/jetstream/jetstream.go +++ b/jetstream/jetstream.go @@ -31,7 +31,7 @@ type ( // Create, update and get operations return 'Stream' interface, // allowing operations on consumers // - // AddConsumer, Consumer and DeleteConsumer are helper methods used to create/fetch/remove consumer without fetching stream (bypassing stream API) + // CreateOrUpdateConsumer, Consumer and DeleteConsumer are helper methods used to create/fetch/remove consumer without fetching stream (bypassing stream API) // // Client returns a JetStremClient, used to publish messages on a stream or fetch messages by sequence number JetStream interface { @@ -52,10 +52,10 @@ type ( PublishMsg(context.Context, *nats.Msg, ...PublishOpt) (*PubAck, error) // PublishAsync performs a asynchronous publish to a stream and returns [PubAckFuture] interface // It accepts subject name (which must be bound to a stream) and message data - PublishAsync(context.Context, string, []byte, ...PublishOpt) (PubAckFuture, error) + PublishAsync(string, []byte, ...PublishOpt) (PubAckFuture, error) // PublishMsgAsync performs a asynchronous publish to a stream and returns [PubAckFuture] interface // It accepts subject name (which must be bound to a stream) and nats.Message - PublishMsgAsync(context.Context, *nats.Msg, ...PublishOpt) (PubAckFuture, error) + PublishMsgAsync(*nats.Msg, ...PublishOpt) (PubAckFuture, error) // PublishAsyncPending returns the number of async publishes outstanding for this context PublishAsyncPending() int // PublishAsyncComplete returns a channel that will be closed when all outstanding messages are ack'd @@ -80,10 +80,10 @@ type ( } StreamConsumerManager interface { - // AddConsumer creates a consumer on a given stream with given config. + // CreateOrUpdateConsumer creates a consumer on a given stream with given config. // If consumer already exists, it will be updated (if possible). // Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages) - AddConsumer(context.Context, string, ConsumerConfig) (Consumer, error) + CreateOrUpdateConsumer(context.Context, string, ConsumerConfig) (Consumer, error) // OrderedConsumer returns an OrderedConsumer instance. // OrderedConsumer allows fetching messages from a stream (just like standard consumer), // for in order delivery of messages. Underlying consumer is re-created when necessary, @@ -449,10 +449,10 @@ func (js *jetStream) DeleteStream(ctx context.Context, name string) error { return nil } -// AddConsumer creates a consumer on a given stream with given config +// CreateOrUpdateConsumer creates a consumer on a given stream with given config // This operation is idempotent - if a consumer already exists, it will be a no-op (or error if configs do not match) // Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages) -func (js *jetStream) AddConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error) { +func (js *jetStream) CreateOrUpdateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error) { if err := validateStreamName(stream); err != nil { return nil, err } diff --git a/jetstream/options.go b/jetstream/options.go index 801651002..51e72ab95 100644 --- a/jetstream/options.go +++ b/jetstream/options.go @@ -166,9 +166,9 @@ func (t PullThresholdMessages) configureConsume(opts *consumeOpts) error { // PullThresholdBytes sets the byte count on which Consume will trigger // new pull request to the server. Defaults to 50% of MaxBytes (if set). -type PullThresholBytes int +type PullThresholdBytes int -func (t PullThresholBytes) configureConsume(opts *consumeOpts) error { +func (t PullThresholdBytes) configureConsume(opts *consumeOpts) error { opts.ThresholdBytes = int(t) return nil } diff --git a/jetstream/ordered.go b/jetstream/ordered.go index 3043e60e5..a3864860b 100644 --- a/jetstream/ordered.go +++ b/jetstream/ordered.go @@ -358,7 +358,7 @@ func (c *orderedConsumer) reset() error { return fmt.Errorf("%w: maximum number of create consumer attempts reached: %s", ErrOrderedConsumerReset, err) } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - cons, err = c.jetStream.AddConsumer(ctx, c.stream, *consumerConfig) + cons, err = c.jetStream.CreateOrUpdateConsumer(ctx, c.stream, *consumerConfig) if err != nil { if errors.Is(err, ErrConsumerNotFound) { cancel() diff --git a/jetstream/publish.go b/jetstream/publish.go index 6ba999b21..1b49d2f2c 100644 --- a/jetstream/publish.go +++ b/jetstream/publish.go @@ -81,7 +81,7 @@ type ( } // MsgErrHandler is used to process asynchronous errors from - // JetStream PublishAsynjs. It will return the original + // JetStream PublishAsync. It will return the original // message sent to the server for possible retransmitting and the error encountered. MsgErrHandler func(JetStream, *nats.Msg, error) @@ -200,11 +200,11 @@ func (js *jetStream) PublishMsg(ctx context.Context, m *nats.Msg, opts ...Publis return ackResp.PubAck, nil } -func (js *jetStream) PublishAsync(ctx context.Context, subj string, data []byte, opts ...PublishOpt) (PubAckFuture, error) { - return js.PublishMsgAsync(ctx, &nats.Msg{Subject: subj, Data: data}, opts...) +func (js *jetStream) PublishAsync(subj string, data []byte, opts ...PublishOpt) (PubAckFuture, error) { + return js.PublishMsgAsync(&nats.Msg{Subject: subj, Data: data}, opts...) } -func (js *jetStream) PublishMsgAsync(ctx context.Context, m *nats.Msg, opts ...PublishOpt) (PubAckFuture, error) { +func (js *jetStream) PublishMsgAsync(m *nats.Msg, opts ...PublishOpt) (PubAckFuture, error) { var o pubOpts if len(opts) > 0 { if m.Header == nil { @@ -309,7 +309,7 @@ func (js *jetStream) newAsyncReply() (string, error) { return sb.String(), nil } -// Handle an async reply from PublishAsynjs. +// Handle an async reply from PublishAsync. func (js *jetStream) handleAsyncReply(m *nats.Msg) { if len(m.Subject) <= aReplyPreLen { return diff --git a/jetstream/stream.go b/jetstream/stream.go index 7b70a1cc0..4139e6b2a 100644 --- a/jetstream/stream.go +++ b/jetstream/stream.go @@ -51,10 +51,10 @@ type ( } streamConsumerManager interface { - // AddConsumer creates a consumer on a given stream with given config. + // CreateOrUpdateConsumer creates a consumer on a given stream with given config. // If consumer already exists, it will be updated (if possible). // Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages). - AddConsumer(context.Context, ConsumerConfig) (Consumer, error) + CreateOrUpdateConsumer(context.Context, ConsumerConfig) (Consumer, error) // OrderedConsumer returns an OrderedConsumer instance. // OrderedConsumer allows fetching messages from a stream (just like standard consumer), @@ -193,7 +193,7 @@ type ( } ) -func (s *stream) AddConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error) { +func (s *stream) CreateOrUpdateConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error) { return upsertConsumer(ctx, s.jetStream, s.name, cfg) } diff --git a/jetstream/test/consumer_test.go b/jetstream/test/consumer_test.go index 5978fc844..bae2ff494 100644 --- a/jetstream/test/consumer_test.go +++ b/jetstream/test/consumer_test.go @@ -44,7 +44,7 @@ func TestConsumerInfo(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{ + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ Durable: "cons", AckPolicy: jetstream.AckExplicitPolicy, Description: "test consumer", @@ -66,7 +66,7 @@ func TestConsumerInfo(t *testing.T) { } // update consumer and see if info is updated - _, err = s.AddConsumer(ctx, jetstream.ConsumerConfig{ + _, err = s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ Durable: "cons", AckPolicy: jetstream.AckExplicitPolicy, Description: "updated consumer", @@ -140,7 +140,7 @@ func TestConsumerCachedInfo(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{ + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ Durable: "cons", AckPolicy: jetstream.AckExplicitPolicy, Description: "test consumer", @@ -159,7 +159,7 @@ func TestConsumerCachedInfo(t *testing.T) { } // update consumer and see if info is updated - _, err = s.AddConsumer(ctx, jetstream.ConsumerConfig{ + _, err = s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ Durable: "cons", AckPolicy: jetstream.AckExplicitPolicy, Description: "updated consumer", diff --git a/jetstream/test/jetstream_test.go b/jetstream/test/jetstream_test.go index 3416c1410..d22c5ba87 100644 --- a/jetstream/test/jetstream_test.go +++ b/jetstream/test/jetstream_test.go @@ -666,7 +666,7 @@ func TestStreamNames(t *testing.T) { } } -func TestJetStream_AddConsumer(t *testing.T) { +func TestJetStream_CreateOrUpdateConsumer(t *testing.T) { tests := []struct { name string stream string @@ -743,7 +743,7 @@ func TestJetStream_AddConsumer(t *testing.T) { } else { sub, err = nc.SubscribeSync("$JS.API.CONSUMER.CREATE.foo.*") } - c, err := js.AddConsumer(ctx, test.stream, test.consumerConfig) + c, err := js.CreateOrUpdateConsumer(ctx, test.stream, test.consumerConfig) if test.withError != nil { if err == nil || !errors.Is(err, test.withError) { t.Fatalf("Expected error: %v; got: %v", test.withError, err) @@ -823,7 +823,7 @@ func TestJetStream_Consumer(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - _, err = s.AddConsumer(ctx, jetstream.ConsumerConfig{Durable: "dur", AckPolicy: jetstream.AckAllPolicy, Description: "desc"}) + _, err = s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{Durable: "dur", AckPolicy: jetstream.AckAllPolicy, Description: "desc"}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -904,7 +904,7 @@ func TestJetStream_DeleteConsumer(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - _, err = s.AddConsumer(ctx, jetstream.ConsumerConfig{Durable: "dur", AckPolicy: jetstream.AckAllPolicy, Description: "desc"}) + _, err = s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{Durable: "dur", AckPolicy: jetstream.AckAllPolicy, Description: "desc"}) if err != nil { t.Fatalf("Unexpected error: %v", err) } diff --git a/jetstream/test/message_test.go b/jetstream/test/message_test.go index 5dff04e48..99cb129bf 100644 --- a/jetstream/test/message_test.go +++ b/jetstream/test/message_test.go @@ -44,7 +44,7 @@ func TestMessageDetails(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{ + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ Durable: "cons", AckPolicy: jetstream.AckExplicitPolicy, Description: "test consumer", @@ -104,7 +104,7 @@ func TestAckVariants(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{ + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ Durable: "cons", AckPolicy: jetstream.AckExplicitPolicy, Description: "test consumer", diff --git a/jetstream/test/publish_test.go b/jetstream/test/publish_test.go index b566f280c..a0683e288 100644 --- a/jetstream/test/publish_test.go +++ b/jetstream/test/publish_test.go @@ -1131,7 +1131,7 @@ func TestPublishMsgAsync(t *testing.T) { } for _, pub := range test.msgs { - ackFuture, err := js.PublishMsgAsync(ctx, pub.msg, pub.opts...) + ackFuture, err := js.PublishMsgAsync(pub.msg, pub.opts...) if pub.withPublishError != nil { pub.withPublishError(t, err) continue @@ -1192,7 +1192,7 @@ func TestPublishMsgAsyncWithPendingMsgs(t *testing.T) { } for i := 0; i < 20; i++ { - _, err = js.PublishAsync(ctx, "FOO.1", []byte("msg")) + _, err = js.PublishAsync("FOO.1", []byte("msg")) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1224,12 +1224,12 @@ func TestPublishMsgAsyncWithPendingMsgs(t *testing.T) { } for i := 0; i < 5; i++ { - _, err = js.PublishAsync(ctx, "FOO.1", []byte("msg"), jetstream.WithStallWait(1*time.Nanosecond)) + _, err = js.PublishAsync("FOO.1", []byte("msg"), jetstream.WithStallWait(1*time.Nanosecond)) if err != nil { t.Fatalf("Unexpected error: %v", err) } } - if _, err = js.PublishAsync(ctx, "FOO.1", []byte("msg"), jetstream.WithStallWait(1*time.Nanosecond)); err == nil || !errors.Is(err, jetstream.ErrTooManyStalledMsgs) { + if _, err = js.PublishAsync("FOO.1", []byte("msg"), jetstream.WithStallWait(1*time.Nanosecond)); err == nil || !errors.Is(err, jetstream.ErrTooManyStalledMsgs) { t.Fatalf("Expected error: %v; got: %v", jetstream.ErrTooManyStalledMsgs, err) } }) diff --git a/jetstream/test/pull_test.go b/jetstream/test/pull_test.go index a3e155c4d..811572ec2 100644 --- a/jetstream/test/pull_test.go +++ b/jetstream/test/pull_test.go @@ -56,7 +56,7 @@ func TestPullConsumerFetch(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -102,7 +102,7 @@ func TestPullConsumerFetch(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -152,7 +152,7 @@ func TestPullConsumerFetch(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -220,7 +220,7 @@ func TestPullConsumerFetch(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -258,7 +258,7 @@ func TestPullConsumerFetch(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -305,7 +305,7 @@ func TestPullConsumerFetch(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -341,7 +341,7 @@ func TestPullConsumerFetch(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -384,7 +384,7 @@ func TestPullConsumerFetchBytes(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy, Name: "con"}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy, Name: "con"}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -429,7 +429,7 @@ func TestPullConsumerFetchBytes(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy, Name: "con"}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy, Name: "con"}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -473,7 +473,7 @@ func TestPullConsumerFetchBytes(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy, Name: "con"}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy, Name: "con"}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -518,7 +518,7 @@ func TestPullConsumerFetchBytes(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy, Name: "con"}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy, Name: "con"}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -582,7 +582,7 @@ func TestPullConsumerFetch_WithCluster(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -625,7 +625,7 @@ func TestPullConsumerFetch_WithCluster(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -676,7 +676,7 @@ func TestPullConsumerMessages(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -738,7 +738,7 @@ func TestPullConsumerMessages(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -794,7 +794,7 @@ func TestPullConsumerMessages(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -864,7 +864,7 @@ func TestPullConsumerMessages(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -932,7 +932,7 @@ func TestPullConsumerMessages(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1001,7 +1001,7 @@ func TestPullConsumerMessages(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1071,7 +1071,7 @@ func TestPullConsumerMessages(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1148,7 +1148,7 @@ func TestPullConsumerMessages(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1178,7 +1178,7 @@ func TestPullConsumerMessages(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1253,7 +1253,7 @@ func TestPullConsumerConsume(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1302,7 +1302,7 @@ func TestPullConsumerConsume(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1360,7 +1360,7 @@ func TestPullConsumerConsume(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1429,7 +1429,7 @@ func TestPullConsumerConsume(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1479,7 +1479,7 @@ func TestPullConsumerConsume(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1529,7 +1529,7 @@ func TestPullConsumerConsume(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1592,7 +1592,7 @@ func TestPullConsumerConsume(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1656,7 +1656,7 @@ func TestPullConsumerConsume(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1688,7 +1688,7 @@ func TestPullConsumerConsume(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1738,7 +1738,7 @@ func TestPullConsumerConsume(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1770,7 +1770,7 @@ func TestPullConsumerConsume(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1818,7 +1818,7 @@ func TestPullConsumerConsume(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1882,7 +1882,7 @@ func TestPullConsumerConsume_WithCluster(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1930,7 +1930,7 @@ func TestPullConsumerConsume_WithCluster(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -2016,7 +2016,7 @@ func TestPullConsumerNext(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -2057,7 +2057,7 @@ func TestPullConsumerNext(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -2094,7 +2094,7 @@ func TestPullConsumerNext(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } diff --git a/jetstream/test/stream_test.go b/jetstream/test/stream_test.go index 354377556..3b53f086f 100644 --- a/jetstream/test/stream_test.go +++ b/jetstream/test/stream_test.go @@ -26,7 +26,7 @@ import ( "github.com/nats-io/nats.go/jetstream" ) -func TestAddConsumer(t *testing.T) { +func TestCreateOrUpdateConsumer(t *testing.T) { tests := []struct { name string consumerConfig jetstream.ConsumerConfig @@ -97,7 +97,7 @@ func TestAddConsumer(t *testing.T) { } else { sub, err = nc.SubscribeSync("$JS.API.CONSUMER.CREATE.foo.*") } - c, err := s.AddConsumer(ctx, test.consumerConfig) + c, err := s.CreateOrUpdateConsumer(ctx, test.consumerConfig) if test.withError != nil { if err == nil || !errors.Is(err, test.withError) { t.Fatalf("Expected error: %v; got: %v", test.withError, err) @@ -164,7 +164,7 @@ func TestConsumer(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - _, err = s.AddConsumer(ctx, jetstream.ConsumerConfig{Durable: "dur", AckPolicy: jetstream.AckAllPolicy, Description: "desc"}) + _, err = s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{Durable: "dur", AckPolicy: jetstream.AckAllPolicy, Description: "desc"}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -229,7 +229,7 @@ func TestDeleteConsumer(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - _, err = s.AddConsumer(ctx, jetstream.ConsumerConfig{Durable: "dur", AckPolicy: jetstream.AckAllPolicy, Description: "desc"}) + _, err = s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{Durable: "dur", AckPolicy: jetstream.AckAllPolicy, Description: "desc"}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -872,7 +872,7 @@ func TestListConsumers(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } for i := 0; i < test.consumersNum; i++ { - _, err = s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + _, err = s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -935,7 +935,7 @@ func TestConsumerNames(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } for i := 0; i < test.consumersNum; i++ { - _, err = s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + _, err = s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1041,7 +1041,7 @@ func TestPurgeStream(t *testing.T) { } return } - c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) }