Skip to content

Commit

Permalink
[CHANGED] Simplified JetStream API changes and improvements (#1300)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio authored Jun 8, 2023
1 parent 15f03a8 commit abd4cb0
Show file tree
Hide file tree
Showing 18 changed files with 89 additions and 95 deletions.
2 changes: 1 addition & 1 deletion examples/jetstream/js-consume/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func main() {
log.Fatal(err)
}

cons, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{
cons, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "TestConsumerConsume",
AckPolicy: jetstream.AckExplicitPolicy,
})
Expand Down
2 changes: 1 addition & 1 deletion examples/jetstream/js-fetch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func main() {
log.Fatal(err)
}

cons, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{
cons, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "TestConsumerListener",
AckPolicy: jetstream.AckExplicitPolicy,
})
Expand Down
2 changes: 1 addition & 1 deletion examples/jetstream/js-messages/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func main() {
log.Fatal(err)
}

cons, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{
cons, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "TestConsumerMessages",
AckPolicy: jetstream.AckExplicitPolicy,
})
Expand Down
2 changes: 1 addition & 1 deletion examples/jetstream/js-next/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func main() {
log.Fatal(err)
}

cons, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{
cons, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "TestConsumerListener",
AckPolicy: jetstream.AckExplicitPolicy,
})
Expand Down
2 changes: 1 addition & 1 deletion examples/jetstream/js-parallel-consume/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func main() {
log.Fatal(err)
}

cons, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{
cons, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "TestConsumerParallelConsume",
AckPolicy: jetstream.AckExplicitPolicy,
})
Expand Down
12 changes: 6 additions & 6 deletions jetstream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func main() {
}

// Create durable consumer
c, _ := s.AddConsumer(ctx, jetstream.ConsumerConfig{
c, _ := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "CONS",
AckPolicy: jetstream.AckExplicitPolicy,
})
Expand Down Expand Up @@ -253,13 +253,13 @@ CRUD operations on consumers can be achieved on 2 levels:
js, _ := jetstream.New(nc)

// create a consumer (this is an idempotent operation)
cons, _ := js.AddConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
cons, _ := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
Durable: "foo",
AckPolicy: jetstream.AckExplicitPolicy,
})

// create an ephemeral pull consumer by not providing `Durable`
ephemeral, _ := js.AddConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
ephemeral, _ := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
AckPolicy: jetstream.AckExplicitPolicy,
})

Expand All @@ -280,7 +280,7 @@ js, _ := jetstream.New(nc)
stream, _ := js.Stream(ctx, "ORDERS")

// create consumer
cons, _ := stream.AddConsumer(ctx, jetstream.ConsumerConfig{
cons, _ := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "foo",
AckPolicy: jetstream.AckExplicitPolicy,
})
Expand Down Expand Up @@ -422,7 +422,7 @@ value.
##### Using `Consume()` receive messages in a callback

```go
cons, _ := js.AddConsumer("ORDERS", jetstream.ConsumerConfig{
cons, _ := js.CreateOrUpdateConsumer("ORDERS", jetstream.ConsumerConfig{
AckPolicy: jetstream.AckExplicitPolicy,
// receive messages from ORDERS.A subject only
FilterSubject: "ORDERS.A"
Expand Down Expand Up @@ -557,7 +557,7 @@ case err := <-ackF.Err():
}

// similarly to syncronous publish, there is a helper method accepting subject and data
ackF, err = js.PublishAsync(ctx, "ORDERS.new", []byte("hello"))
ackF, err = js.PublishAsync("ORDERS.new", []byte("hello"))
```

Just as for synchronous publish, `PublishAsync()` and `PublishMsgAsync()` accept
Expand Down
6 changes: 0 additions & 6 deletions jetstream/consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,13 @@ type (
SampleFrequency string `json:"sample_freq,omitempty"`
MaxWaiting int `json:"max_waiting,omitempty"`
MaxAckPending int `json:"max_ack_pending,omitempty"`
FlowControl bool `json:"flow_control,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
HeadersOnly bool `json:"headers_only,omitempty"`

// Pull based options.
MaxRequestBatch int `json:"max_batch,omitempty"`
MaxRequestExpires time.Duration `json:"max_expires,omitempty"`
MaxRequestMaxBytes int `json:"max_bytes,omitempty"`

// Push based consumers.
DeliverSubject string `json:"deliver_subject,omitempty"`
DeliverGroup string `json:"deliver_group,omitempty"`

// Inactivity threshold.
InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`

Expand Down
14 changes: 7 additions & 7 deletions jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type (
// Create, update and get operations return 'Stream' interface,
// allowing operations on consumers
//
// AddConsumer, Consumer and DeleteConsumer are helper methods used to create/fetch/remove consumer without fetching stream (bypassing stream API)
// CreateOrUpdateConsumer, Consumer and DeleteConsumer are helper methods used to create/fetch/remove consumer without fetching stream (bypassing stream API)
//
// Client returns a JetStremClient, used to publish messages on a stream or fetch messages by sequence number
JetStream interface {
Expand All @@ -52,10 +52,10 @@ type (
PublishMsg(context.Context, *nats.Msg, ...PublishOpt) (*PubAck, error)
// PublishAsync performs a asynchronous publish to a stream and returns [PubAckFuture] interface
// It accepts subject name (which must be bound to a stream) and message data
PublishAsync(context.Context, string, []byte, ...PublishOpt) (PubAckFuture, error)
PublishAsync(string, []byte, ...PublishOpt) (PubAckFuture, error)
// PublishMsgAsync performs a asynchronous publish to a stream and returns [PubAckFuture] interface
// It accepts subject name (which must be bound to a stream) and nats.Message
PublishMsgAsync(context.Context, *nats.Msg, ...PublishOpt) (PubAckFuture, error)
PublishMsgAsync(*nats.Msg, ...PublishOpt) (PubAckFuture, error)
// PublishAsyncPending returns the number of async publishes outstanding for this context
PublishAsyncPending() int
// PublishAsyncComplete returns a channel that will be closed when all outstanding messages are ack'd
Expand All @@ -80,10 +80,10 @@ type (
}

StreamConsumerManager interface {
// AddConsumer creates a consumer on a given stream with given config.
// CreateOrUpdateConsumer creates a consumer on a given stream with given config.
// If consumer already exists, it will be updated (if possible).
// Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages)
AddConsumer(context.Context, string, ConsumerConfig) (Consumer, error)
CreateOrUpdateConsumer(context.Context, string, ConsumerConfig) (Consumer, error)
// OrderedConsumer returns an OrderedConsumer instance.
// OrderedConsumer allows fetching messages from a stream (just like standard consumer),
// for in order delivery of messages. Underlying consumer is re-created when necessary,
Expand Down Expand Up @@ -449,10 +449,10 @@ func (js *jetStream) DeleteStream(ctx context.Context, name string) error {
return nil
}

// AddConsumer creates a consumer on a given stream with given config
// CreateOrUpdateConsumer creates a consumer on a given stream with given config
// This operation is idempotent - if a consumer already exists, it will be a no-op (or error if configs do not match)
// Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages)
func (js *jetStream) AddConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error) {
func (js *jetStream) CreateOrUpdateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error) {
if err := validateStreamName(stream); err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions jetstream/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ func (t PullThresholdMessages) configureConsume(opts *consumeOpts) error {

// PullThresholdBytes sets the byte count on which Consume will trigger
// new pull request to the server. Defaults to 50% of MaxBytes (if set).
type PullThresholBytes int
type PullThresholdBytes int

func (t PullThresholBytes) configureConsume(opts *consumeOpts) error {
func (t PullThresholdBytes) configureConsume(opts *consumeOpts) error {
opts.ThresholdBytes = int(t)
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion jetstream/ordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func (c *orderedConsumer) reset() error {
return fmt.Errorf("%w: maximum number of create consumer attempts reached: %s", ErrOrderedConsumerReset, err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
cons, err = c.jetStream.AddConsumer(ctx, c.stream, *consumerConfig)
cons, err = c.jetStream.CreateOrUpdateConsumer(ctx, c.stream, *consumerConfig)
if err != nil {
if errors.Is(err, ErrConsumerNotFound) {
cancel()
Expand Down
10 changes: 5 additions & 5 deletions jetstream/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type (
}

// MsgErrHandler is used to process asynchronous errors from
// JetStream PublishAsynjs. It will return the original
// JetStream PublishAsync. It will return the original
// message sent to the server for possible retransmitting and the error encountered.
MsgErrHandler func(JetStream, *nats.Msg, error)

Expand Down Expand Up @@ -200,11 +200,11 @@ func (js *jetStream) PublishMsg(ctx context.Context, m *nats.Msg, opts ...Publis
return ackResp.PubAck, nil
}

func (js *jetStream) PublishAsync(ctx context.Context, subj string, data []byte, opts ...PublishOpt) (PubAckFuture, error) {
return js.PublishMsgAsync(ctx, &nats.Msg{Subject: subj, Data: data}, opts...)
func (js *jetStream) PublishAsync(subj string, data []byte, opts ...PublishOpt) (PubAckFuture, error) {
return js.PublishMsgAsync(&nats.Msg{Subject: subj, Data: data}, opts...)
}

func (js *jetStream) PublishMsgAsync(ctx context.Context, m *nats.Msg, opts ...PublishOpt) (PubAckFuture, error) {
func (js *jetStream) PublishMsgAsync(m *nats.Msg, opts ...PublishOpt) (PubAckFuture, error) {
var o pubOpts
if len(opts) > 0 {
if m.Header == nil {
Expand Down Expand Up @@ -309,7 +309,7 @@ func (js *jetStream) newAsyncReply() (string, error) {
return sb.String(), nil
}

// Handle an async reply from PublishAsynjs.
// Handle an async reply from PublishAsync.
func (js *jetStream) handleAsyncReply(m *nats.Msg) {
if len(m.Subject) <= aReplyPreLen {
return
Expand Down
6 changes: 3 additions & 3 deletions jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ type (
}

streamConsumerManager interface {
// AddConsumer creates a consumer on a given stream with given config.
// CreateOrUpdateConsumer creates a consumer on a given stream with given config.
// If consumer already exists, it will be updated (if possible).
// Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages).
AddConsumer(context.Context, ConsumerConfig) (Consumer, error)
CreateOrUpdateConsumer(context.Context, ConsumerConfig) (Consumer, error)

// OrderedConsumer returns an OrderedConsumer instance.
// OrderedConsumer allows fetching messages from a stream (just like standard consumer),
Expand Down Expand Up @@ -193,7 +193,7 @@ type (
}
)

func (s *stream) AddConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error) {
func (s *stream) CreateOrUpdateConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error) {
return upsertConsumer(ctx, s.jetStream, s.name, cfg)
}

Expand Down
8 changes: 4 additions & 4 deletions jetstream/test/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestConsumerInfo(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{
c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "cons",
AckPolicy: jetstream.AckExplicitPolicy,
Description: "test consumer",
Expand All @@ -66,7 +66,7 @@ func TestConsumerInfo(t *testing.T) {
}

// update consumer and see if info is updated
_, err = s.AddConsumer(ctx, jetstream.ConsumerConfig{
_, err = s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "cons",
AckPolicy: jetstream.AckExplicitPolicy,
Description: "updated consumer",
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestConsumerCachedInfo(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{
c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "cons",
AckPolicy: jetstream.AckExplicitPolicy,
Description: "test consumer",
Expand All @@ -159,7 +159,7 @@ func TestConsumerCachedInfo(t *testing.T) {
}

// update consumer and see if info is updated
_, err = s.AddConsumer(ctx, jetstream.ConsumerConfig{
_, err = s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "cons",
AckPolicy: jetstream.AckExplicitPolicy,
Description: "updated consumer",
Expand Down
8 changes: 4 additions & 4 deletions jetstream/test/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ func TestStreamNames(t *testing.T) {
}
}

func TestJetStream_AddConsumer(t *testing.T) {
func TestJetStream_CreateOrUpdateConsumer(t *testing.T) {
tests := []struct {
name string
stream string
Expand Down Expand Up @@ -743,7 +743,7 @@ func TestJetStream_AddConsumer(t *testing.T) {
} else {
sub, err = nc.SubscribeSync("$JS.API.CONSUMER.CREATE.foo.*")
}
c, err := js.AddConsumer(ctx, test.stream, test.consumerConfig)
c, err := js.CreateOrUpdateConsumer(ctx, test.stream, test.consumerConfig)
if test.withError != nil {
if err == nil || !errors.Is(err, test.withError) {
t.Fatalf("Expected error: %v; got: %v", test.withError, err)
Expand Down Expand Up @@ -823,7 +823,7 @@ func TestJetStream_Consumer(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
_, err = s.AddConsumer(ctx, jetstream.ConsumerConfig{Durable: "dur", AckPolicy: jetstream.AckAllPolicy, Description: "desc"})
_, err = s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{Durable: "dur", AckPolicy: jetstream.AckAllPolicy, Description: "desc"})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -904,7 +904,7 @@ func TestJetStream_DeleteConsumer(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
_, err = s.AddConsumer(ctx, jetstream.ConsumerConfig{Durable: "dur", AckPolicy: jetstream.AckAllPolicy, Description: "desc"})
_, err = s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{Durable: "dur", AckPolicy: jetstream.AckAllPolicy, Description: "desc"})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions jetstream/test/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestMessageDetails(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{
c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "cons",
AckPolicy: jetstream.AckExplicitPolicy,
Description: "test consumer",
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestAckVariants(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
c, err := s.AddConsumer(ctx, jetstream.ConsumerConfig{
c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "cons",
AckPolicy: jetstream.AckExplicitPolicy,
Description: "test consumer",
Expand Down
8 changes: 4 additions & 4 deletions jetstream/test/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1131,7 +1131,7 @@ func TestPublishMsgAsync(t *testing.T) {
}

for _, pub := range test.msgs {
ackFuture, err := js.PublishMsgAsync(ctx, pub.msg, pub.opts...)
ackFuture, err := js.PublishMsgAsync(pub.msg, pub.opts...)
if pub.withPublishError != nil {
pub.withPublishError(t, err)
continue
Expand Down Expand Up @@ -1192,7 +1192,7 @@ func TestPublishMsgAsyncWithPendingMsgs(t *testing.T) {
}

for i := 0; i < 20; i++ {
_, err = js.PublishAsync(ctx, "FOO.1", []byte("msg"))
_, err = js.PublishAsync("FOO.1", []byte("msg"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -1224,12 +1224,12 @@ func TestPublishMsgAsyncWithPendingMsgs(t *testing.T) {
}

for i := 0; i < 5; i++ {
_, err = js.PublishAsync(ctx, "FOO.1", []byte("msg"), jetstream.WithStallWait(1*time.Nanosecond))
_, err = js.PublishAsync("FOO.1", []byte("msg"), jetstream.WithStallWait(1*time.Nanosecond))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
if _, err = js.PublishAsync(ctx, "FOO.1", []byte("msg"), jetstream.WithStallWait(1*time.Nanosecond)); err == nil || !errors.Is(err, jetstream.ErrTooManyStalledMsgs) {
if _, err = js.PublishAsync("FOO.1", []byte("msg"), jetstream.WithStallWait(1*time.Nanosecond)); err == nil || !errors.Is(err, jetstream.ErrTooManyStalledMsgs) {
t.Fatalf("Expected error: %v; got: %v", jetstream.ErrTooManyStalledMsgs, err)
}
})
Expand Down
Loading

0 comments on commit abd4cb0

Please sign in to comment.