Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHANGED] Simplified JetStream API changes and improvements #1300

Merged
merged 4 commits into from
Jun 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/jetstream/js-consume/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func main() {
log.Fatal(err)
}

cons, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{
cons, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "TestConsumerConsume",
AckPolicy: jetstream.AckExplicitPolicy,
})
Expand Down
2 changes: 1 addition & 1 deletion examples/jetstream/js-fetch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func main() {
log.Fatal(err)
}

cons, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{
cons, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "TestConsumerListener",
AckPolicy: jetstream.AckExplicitPolicy,
})
Expand Down
2 changes: 1 addition & 1 deletion examples/jetstream/js-messages/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func main() {
log.Fatal(err)
}

cons, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{
cons, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "TestConsumerMessages",
AckPolicy: jetstream.AckExplicitPolicy,
})
Expand Down
2 changes: 1 addition & 1 deletion examples/jetstream/js-next/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func main() {
log.Fatal(err)
}

cons, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{
cons, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "TestConsumerListener",
AckPolicy: jetstream.AckExplicitPolicy,
})
Expand Down
2 changes: 1 addition & 1 deletion examples/jetstream/js-parallel-consume/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func main() {
log.Fatal(err)
}

cons, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{
cons, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "TestConsumerParallelConsume",
AckPolicy: jetstream.AckExplicitPolicy,
})
Expand Down
12 changes: 6 additions & 6 deletions jetstream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func main() {
}

// Create durable consumer
c, _ := s.AddConsumer(ctx, jetstream.ConsumerConfig{
c, _ := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "CONS",
AckPolicy: jetstream.AckExplicitPolicy,
})
Expand Down Expand Up @@ -253,13 +253,13 @@ CRUD operations on consumers can be achieved on 2 levels:
js, _ := jetstream.New(nc)

// create a consumer (this is an idempotent operation)
cons, _ := js.AddConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
cons, _ := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
Durable: "foo",
AckPolicy: jetstream.AckExplicitPolicy,
})

// create an ephemeral pull consumer by not providing `Durable`
ephemeral, _ := js.AddConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
ephemeral, _ := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
AckPolicy: jetstream.AckExplicitPolicy,
})

Expand All @@ -280,7 +280,7 @@ js, _ := jetstream.New(nc)
stream, _ := js.Stream(ctx, "ORDERS")

// create consumer
cons, _ := stream.AddConsumer(ctx, jetstream.ConsumerConfig{
cons, _ := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "foo",
AckPolicy: jetstream.AckExplicitPolicy,
})
Expand Down Expand Up @@ -422,7 +422,7 @@ value.
##### Using `Consume()` receive messages in a callback

```go
cons, _ := js.AddConsumer("ORDERS", jetstream.ConsumerConfig{
cons, _ := js.CreateOrUpdateConsumer("ORDERS", jetstream.ConsumerConfig{
AckPolicy: jetstream.AckExplicitPolicy,
// receive messages from ORDERS.A subject only
FilterSubject: "ORDERS.A"
Expand Down Expand Up @@ -557,7 +557,7 @@ case err := <-ackF.Err():
}

// similarly to syncronous publish, there is a helper method accepting subject and data
ackF, err = js.PublishAsync(ctx, "ORDERS.new", []byte("hello"))
ackF, err = js.PublishAsync("ORDERS.new", []byte("hello"))
```

Just as for synchronous publish, `PublishAsync()` and `PublishMsgAsync()` accept
Expand Down
6 changes: 0 additions & 6 deletions jetstream/consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,13 @@ type (
SampleFrequency string `json:"sample_freq,omitempty"`
MaxWaiting int `json:"max_waiting,omitempty"`
MaxAckPending int `json:"max_ack_pending,omitempty"`
FlowControl bool `json:"flow_control,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
HeadersOnly bool `json:"headers_only,omitempty"`

// Pull based options.
MaxRequestBatch int `json:"max_batch,omitempty"`
MaxRequestExpires time.Duration `json:"max_expires,omitempty"`
MaxRequestMaxBytes int `json:"max_bytes,omitempty"`

// Push based consumers.
DeliverSubject string `json:"deliver_subject,omitempty"`
DeliverGroup string `json:"deliver_group,omitempty"`

// Inactivity threshold.
InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`

Expand Down
14 changes: 7 additions & 7 deletions jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type (
// Create, update and get operations return 'Stream' interface,
// allowing operations on consumers
//
// AddConsumer, Consumer and DeleteConsumer are helper methods used to create/fetch/remove consumer without fetching stream (bypassing stream API)
// CreateOrUpdateConsumer, Consumer and DeleteConsumer are helper methods used to create/fetch/remove consumer without fetching stream (bypassing stream API)
//
// Client returns a JetStremClient, used to publish messages on a stream or fetch messages by sequence number
JetStream interface {
Expand All @@ -52,10 +52,10 @@ type (
PublishMsg(context.Context, *nats.Msg, ...PublishOpt) (*PubAck, error)
// PublishAsync performs a asynchronous publish to a stream and returns [PubAckFuture] interface
// It accepts subject name (which must be bound to a stream) and message data
PublishAsync(context.Context, string, []byte, ...PublishOpt) (PubAckFuture, error)
PublishAsync(string, []byte, ...PublishOpt) (PubAckFuture, error)
// PublishMsgAsync performs a asynchronous publish to a stream and returns [PubAckFuture] interface
// It accepts subject name (which must be bound to a stream) and nats.Message
PublishMsgAsync(context.Context, *nats.Msg, ...PublishOpt) (PubAckFuture, error)
PublishMsgAsync(*nats.Msg, ...PublishOpt) (PubAckFuture, error)
// PublishAsyncPending returns the number of async publishes outstanding for this context
PublishAsyncPending() int
// PublishAsyncComplete returns a channel that will be closed when all outstanding messages are ack'd
Expand All @@ -80,10 +80,10 @@ type (
}

StreamConsumerManager interface {
// AddConsumer creates a consumer on a given stream with given config.
// CreateOrUpdateConsumer creates a consumer on a given stream with given config.
// If consumer already exists, it will be updated (if possible).
// Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages)
AddConsumer(context.Context, string, ConsumerConfig) (Consumer, error)
CreateOrUpdateConsumer(context.Context, string, ConsumerConfig) (Consumer, error)
// OrderedConsumer returns an OrderedConsumer instance.
// OrderedConsumer allows fetching messages from a stream (just like standard consumer),
// for in order delivery of messages. Underlying consumer is re-created when necessary,
Expand Down Expand Up @@ -449,10 +449,10 @@ func (js *jetStream) DeleteStream(ctx context.Context, name string) error {
return nil
}

// AddConsumer creates a consumer on a given stream with given config
// CreateOrUpdateConsumer creates a consumer on a given stream with given config
// This operation is idempotent - if a consumer already exists, it will be a no-op (or error if configs do not match)
// Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages)
func (js *jetStream) AddConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error) {
func (js *jetStream) CreateOrUpdateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error) {
if err := validateStreamName(stream); err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions jetstream/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ func (t PullThresholdMessages) configureConsume(opts *consumeOpts) error {

// PullThresholdBytes sets the byte count on which Consume will trigger
// new pull request to the server. Defaults to 50% of MaxBytes (if set).
type PullThresholBytes int
type PullThresholdBytes int

func (t PullThresholBytes) configureConsume(opts *consumeOpts) error {
func (t PullThresholdBytes) configureConsume(opts *consumeOpts) error {
opts.ThresholdBytes = int(t)
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion jetstream/ordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func (c *orderedConsumer) reset() error {
return fmt.Errorf("%w: maximum number of create consumer attempts reached: %s", ErrOrderedConsumerReset, err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
cons, err = c.jetStream.AddConsumer(ctx, c.stream, *consumerConfig)
cons, err = c.jetStream.CreateOrUpdateConsumer(ctx, c.stream, *consumerConfig)
if err != nil {
if errors.Is(err, ErrConsumerNotFound) {
cancel()
Expand Down
10 changes: 5 additions & 5 deletions jetstream/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type (
}

// MsgErrHandler is used to process asynchronous errors from
// JetStream PublishAsynjs. It will return the original
// JetStream PublishAsync. It will return the original
// message sent to the server for possible retransmitting and the error encountered.
MsgErrHandler func(JetStream, *nats.Msg, error)

Expand Down Expand Up @@ -200,11 +200,11 @@ func (js *jetStream) PublishMsg(ctx context.Context, m *nats.Msg, opts ...Publis
return ackResp.PubAck, nil
}

func (js *jetStream) PublishAsync(ctx context.Context, subj string, data []byte, opts ...PublishOpt) (PubAckFuture, error) {
return js.PublishMsgAsync(ctx, &nats.Msg{Subject: subj, Data: data}, opts...)
func (js *jetStream) PublishAsync(subj string, data []byte, opts ...PublishOpt) (PubAckFuture, error) {
return js.PublishMsgAsync(&nats.Msg{Subject: subj, Data: data}, opts...)
}

func (js *jetStream) PublishMsgAsync(ctx context.Context, m *nats.Msg, opts ...PublishOpt) (PubAckFuture, error) {
func (js *jetStream) PublishMsgAsync(m *nats.Msg, opts ...PublishOpt) (PubAckFuture, error) {
var o pubOpts
if len(opts) > 0 {
if m.Header == nil {
Expand Down Expand Up @@ -309,7 +309,7 @@ func (js *jetStream) newAsyncReply() (string, error) {
return sb.String(), nil
}

// Handle an async reply from PublishAsynjs.
// Handle an async reply from PublishAsync.
func (js *jetStream) handleAsyncReply(m *nats.Msg) {
if len(m.Subject) <= aReplyPreLen {
return
Expand Down
6 changes: 3 additions & 3 deletions jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ type (
}

streamConsumerManager interface {
// AddConsumer creates a consumer on a given stream with given config.
// CreateOrUpdateConsumer creates a consumer on a given stream with given config.
// If consumer already exists, it will be updated (if possible).
// Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages).
AddConsumer(context.Context, ConsumerConfig) (Consumer, error)
CreateOrUpdateConsumer(context.Context, ConsumerConfig) (Consumer, error)

// OrderedConsumer returns an OrderedConsumer instance.
// OrderedConsumer allows fetching messages from a stream (just like standard consumer),
Expand Down Expand Up @@ -193,7 +193,7 @@ type (
}
)

func (s *stream) AddConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error) {
func (s *stream) CreateOrUpdateConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error) {
return upsertConsumer(ctx, s.jetStream, s.name, cfg)
}

Expand Down
8 changes: 4 additions & 4 deletions jetstream/test/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestConsumerInfo(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{
c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "cons",
AckPolicy: jetstream.AckExplicitPolicy,
Description: "test consumer",
Expand All @@ -66,7 +66,7 @@ func TestConsumerInfo(t *testing.T) {
}

// update consumer and see if info is updated
_, err = s.AddConsumer(ctx, jetstream.ConsumerConfig{
_, err = s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "cons",
AckPolicy: jetstream.AckExplicitPolicy,
Description: "updated consumer",
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestConsumerCachedInfo(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{
c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "cons",
AckPolicy: jetstream.AckExplicitPolicy,
Description: "test consumer",
Expand All @@ -159,7 +159,7 @@ func TestConsumerCachedInfo(t *testing.T) {
}

// update consumer and see if info is updated
_, err = s.AddConsumer(ctx, jetstream.ConsumerConfig{
_, err = s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "cons",
AckPolicy: jetstream.AckExplicitPolicy,
Description: "updated consumer",
Expand Down
8 changes: 4 additions & 4 deletions jetstream/test/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ func TestStreamNames(t *testing.T) {
}
}

func TestJetStream_AddConsumer(t *testing.T) {
func TestJetStream_CreateOrUpdateConsumer(t *testing.T) {
tests := []struct {
name string
stream string
Expand Down Expand Up @@ -743,7 +743,7 @@ func TestJetStream_AddConsumer(t *testing.T) {
} else {
sub, err = nc.SubscribeSync("$JS.API.CONSUMER.CREATE.foo.*")
}
c, err := js.AddConsumer(ctx, test.stream, test.consumerConfig)
c, err := js.CreateOrUpdateConsumer(ctx, test.stream, test.consumerConfig)
if test.withError != nil {
if err == nil || !errors.Is(err, test.withError) {
t.Fatalf("Expected error: %v; got: %v", test.withError, err)
Expand Down Expand Up @@ -823,7 +823,7 @@ func TestJetStream_Consumer(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
_, err = s.AddConsumer(ctx, jetstream.ConsumerConfig{Durable: "dur", AckPolicy: jetstream.AckAllPolicy, Description: "desc"})
_, err = s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{Durable: "dur", AckPolicy: jetstream.AckAllPolicy, Description: "desc"})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -904,7 +904,7 @@ func TestJetStream_DeleteConsumer(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
_, err = s.AddConsumer(ctx, jetstream.ConsumerConfig{Durable: "dur", AckPolicy: jetstream.AckAllPolicy, Description: "desc"})
_, err = s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{Durable: "dur", AckPolicy: jetstream.AckAllPolicy, Description: "desc"})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions jetstream/test/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestMessageDetails(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{
c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "cons",
AckPolicy: jetstream.AckExplicitPolicy,
Description: "test consumer",
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestAckVariants(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{
c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "cons",
AckPolicy: jetstream.AckExplicitPolicy,
Description: "test consumer",
Expand Down
8 changes: 4 additions & 4 deletions jetstream/test/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1131,7 +1131,7 @@ func TestPublishMsgAsync(t *testing.T) {
}

for _, pub := range test.msgs {
ackFuture, err := js.PublishMsgAsync(ctx, pub.msg, pub.opts...)
ackFuture, err := js.PublishMsgAsync(pub.msg, pub.opts...)
if pub.withPublishError != nil {
pub.withPublishError(t, err)
continue
Expand Down Expand Up @@ -1192,7 +1192,7 @@ func TestPublishMsgAsyncWithPendingMsgs(t *testing.T) {
}

for i := 0; i < 20; i++ {
_, err = js.PublishAsync(ctx, "FOO.1", []byte("msg"))
_, err = js.PublishAsync("FOO.1", []byte("msg"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -1224,12 +1224,12 @@ func TestPublishMsgAsyncWithPendingMsgs(t *testing.T) {
}

for i := 0; i < 5; i++ {
_, err = js.PublishAsync(ctx, "FOO.1", []byte("msg"), jetstream.WithStallWait(1*time.Nanosecond))
_, err = js.PublishAsync("FOO.1", []byte("msg"), jetstream.WithStallWait(1*time.Nanosecond))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
if _, err = js.PublishAsync(ctx, "FOO.1", []byte("msg"), jetstream.WithStallWait(1*time.Nanosecond)); err == nil || !errors.Is(err, jetstream.ErrTooManyStalledMsgs) {
if _, err = js.PublishAsync("FOO.1", []byte("msg"), jetstream.WithStallWait(1*time.Nanosecond)); err == nil || !errors.Is(err, jetstream.ErrTooManyStalledMsgs) {
t.Fatalf("Expected error: %v; got: %v", jetstream.ErrTooManyStalledMsgs, err)
}
})
Expand Down
Loading