Skip to content

Commit

Permalink
added support for new and last_per_subject delivery strategies
Browse files Browse the repository at this point in the history
Signed-off-by: Daan Gerits <[email protected]>
  • Loading branch information
calmera committed Mar 6, 2024
1 parent 92506a1 commit f9bf3df
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 3 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ site
TODO.md
release_notes.md
.idea
.vscode
.vscode
.op
10 changes: 8 additions & 2 deletions internal/impl/nats/input_jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ You can access these metadata fields using
Description("Indicates that the subscription should use an existing consumer.").
Optional()).
Field(service.NewStringAnnotatedEnumField("deliver", map[string]string{
"all": "Deliver all available messages.",
"last": "Deliver starting with the last published messages.",
"all": "Deliver all available messages.",
"last": "Deliver starting with the last published messages.",
"last_per_subject": "Deliver starting with the last published message per subject.",
"new": "Deliver starting from now, not taking into account any previous messages.",
}).
Description("Determines which messages to deliver when consuming without a durable subscriber.").
Default("all")).
Expand Down Expand Up @@ -153,6 +155,10 @@ func newJetStreamReaderFromConfig(conf *service.ParsedConfig, mgr *service.Resou
j.deliverOpt = nats.DeliverAll()
case "last":
j.deliverOpt = nats.DeliverLast()
case "last_per_subject":
j.deliverOpt = nats.DeliverLastPerSubject()
case "new":
j.deliverOpt = nats.DeliverNew()
default:
return nil, fmt.Errorf("deliver option %v was not recognised", deliver)
}
Expand Down
2 changes: 2 additions & 0 deletions website/docs/components/inputs/nats_jetstream.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ Default: `"all"`
|---|---|
| `all` | Deliver all available messages. |
| `last` | Deliver starting with the last published messages. |
| `last_per_subject` | Deliver starting with the last published message per subject. |
| `new` | Deliver starting from now, not taking into account any previous messages. |


### `ack_wait`
Expand Down

0 comments on commit f9bf3df

Please sign in to comment.