Skip to content

Commit

Permalink
Add constants for Queue arguments (#145)
Browse files Browse the repository at this point in the history
* Add constants for Queue arguments

Queues accept optional arguments during queue declaration. Most of those
arguments can be set using a policy (recommended), however, queue type
must be set at queue declaration. To pave the way for Quorum and Stream
queue adoption, this commit adds constants to easily set the queue type.

Other common used arguments are part of this commit. Additional queue
arguments can be added as needed/requested by the community.

Signed-off-by: Aitor Pérez Cedres <[email protected]>

* Add value even though it is not strictly necessary

Signed-off-by: Aitor Pérez Cedres <[email protected]>
Co-authored-by: Luke Bakken <[email protected]>
  • Loading branch information
Zerpet and lukebakken authored Dec 22, 2022
1 parent 5bf455f commit 0c7af02
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 0 deletions.
36 changes: 36 additions & 0 deletions examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,3 +503,39 @@ func refreshJWToken(token string) (string, error) {
// do OAuth2 things to refresh tokens
return "so fresh!", nil
}

func ExampleChannel_QueueDeclare_quorum() {
conn, _ := amqp.Dial("amqp://localhost")
ch, _ := conn.Channel()
args := amqp.Table{ // queue args
amqp.QueueTypeArg: amqp.QueueTypeQuorum,
}
q, _ := ch.QueueDeclare(
"my-quorum-queue", // queue name
true, // durable
false, // auto-delete
false, // exclusive
false, // noWait
args,
)
log.Printf("Declared queue: %s with arguments: %v", q.Name, args)
}

func ExampleChannel_QueueDeclare_stream() {
conn, _ := amqp.Dial("amqp://localhost")
ch, _ := conn.Channel()
q, _ := ch.QueueDeclare(
"my-stream-queue", // queue name
true, // durable
false, // auto-delete
false, // exclusive
false, // noWait
amqp.Table{ // queue args
amqp.QueueTypeArg: amqp.QueueTypeStream,
amqp.StreamMaxLenBytesArg: 5_000_000_000, // 5 Gb
amqp.StreamMaxSegmentSizeBytesArg: 500_000_000, // 500 Mb
amqp.StreamMaxAgeArg: "3D", // 3 days
},
)
log.Printf("Declared queue: %s", q.Name)
}
62 changes: 62 additions & 0 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,68 @@ type Decimal struct {
Value int32
}

// Most common queue argument keys in queue declaration. For a comprehensive list
// of queue arguments, visit [RabbitMQ Queue docs].
//
// QueueTypeArg queue argument is used to declare quorum and stream queues.
// Accepted values are QueueTypeClassic (default), QueueTypeQuorum and
// QueueTypeStream. [Quorum Queues] accept (almost) all queue arguments as their
// Classic Queues counterparts. Check [feature comparison] docs for more
// information.
//
// Queues can define their [max length] using QueueMaxLenArg and
// QueueMaxLenBytesArg queue arguments. Overflow behaviour is set using
// QueueOverflowArg. Accepted values are QueueOverflowDropHead (default),
// QueueOverflowRejectPublish and QueueOverflowRejectPublishDLX.
//
// [Queue TTL] can be defined using QueueTTLArg. That is, the time-to-live for an
// unused queue. [Queue Message TTL] can be defined using QueueMessageTTLArg.
// This will set a time-to-live for **messages** in the queue.
//
// [Stream retention] can be configured using StreamMaxLenBytesArg, to set the
// maximum size of the stream. Please note that stream queues always keep, at
// least, one segment. [Stream retention] can also be set using StreamMaxAgeArg,
// to set time-based retention. Values are string with unit suffix. Valid
// suffixes are Y, M, D, h, m, s. E.g. "7D" for one week. The maximum segment
// size can be set using StreamMaxSegmentSizeBytesArg. The default value is
// 500_000_000 bytes ~= 500 megabytes
//
// [RabbitMQ Queue docs]: https://rabbitmq.com/queues.html
// [Stream retention]: https://rabbitmq.com/streams.html#retention
// [max length]: https://rabbitmq.com/maxlength.html
// [Queue TTL]: https://rabbitmq.com/ttl.html#queue-ttl
// [Queue Message TTL]: https://rabbitmq.com/ttl.html#per-queue-message-ttl
// [Quorum Queues]: https://rabbitmq.com/quorum-queues.html
// [feature comparison]: https://rabbitmq.com/quorum-queues.html#feature-comparison
const (
QueueTypeArg = "x-queue-type"
QueueMaxLenArg = "x-max-length"
QueueMaxLenBytesArg = "x-max-length-bytes"
StreamMaxLenBytesArg = "x-max-length-bytes"
QueueOverflowArg = "x-overflow"
QueueMessageTTLArg = "x-message-ttl"
QueueTTLArg = "x-expires"
StreamMaxAgeArg = "x-max-age"
StreamMaxSegmentSizeBytesArg = "x-stream-max-segment-size-bytes"
)

// Values for queue arguments. Use as values for queue arguments during queue declaration.
// The following argument table will create a classic queue, with max length set to 100 messages,
// and a queue TTL of 30 minutes.
// args := amqp.Table{
// amqp.QueueTypeArg: QueueTypeClassic,
// amqp.QueueMaxLenArg: 100,
// amqp.QueueTTLArg: 1800000,
// }
const (
QueueTypeClassic = "classic"
QueueTypeQuorum = "quorum"
QueueTypeStream = "stream"
QueueOverflowDropHead = "drop-head"
QueueOverflowRejectPublish = "reject-publish"
QueueOverflowRejectPublishDLX = "reject-publish-dlx"
)

// Table stores user supplied fields of the following types:
//
// bool
Expand Down

0 comments on commit 0c7af02

Please sign in to comment.