Skip to content

Commit

Permalink
Support passive queue declaration in amqp_consumer (#5831)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored May 20, 2019
1 parent b5cd9a9 commit e141518
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 51 deletions.
8 changes: 6 additions & 2 deletions plugins/inputs/amqp_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ The following defaults are known to work with RabbitMQ:
# username = ""
# password = ""

## Exchange to declare and consume from.
## Name of the exchange to declare. If unset, no exchange will be declared.
exchange = "telegraf"

## Exchange type; common types are "direct", "fanout", "topic", "header", "x-consistent-hash".
Expand All @@ -49,7 +49,11 @@ The following defaults are known to work with RabbitMQ:
## AMQP queue durability can be "transient" or "durable".
queue_durability = "durable"

## Binding Key
## If true, queue will be passively declared.
# queue_passive = false

## A binding between the exchange and queue using this binding key is
## created. If unset, no binding is created.
binding_key = "#"

## Maximum number of messages server should give to the worker.
Expand Down
134 changes: 87 additions & 47 deletions plugins/inputs/amqp_consumer/amqp_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type AMQPConsumer struct {
// Queue Name
Queue string `toml:"queue"`
QueueDurability string `toml:"queue_durability"`
QueuePassive bool `toml:"queue_passive"`

// Binding Key
BindingKey string `toml:"binding_key"`
Expand Down Expand Up @@ -101,7 +102,7 @@ func (a *AMQPConsumer) SampleConfig() string {
# username = ""
# password = ""
## Exchange to declare and consume from.
## Name of the exchange to declare. If unset, no exchange will be declared.
exchange = "telegraf"
## Exchange type; common types are "direct", "fanout", "topic", "header", "x-consistent-hash".
Expand All @@ -123,7 +124,11 @@ func (a *AMQPConsumer) SampleConfig() string {
## AMQP queue durability can be "transient" or "durable".
queue_durability = "durable"
## Binding Key.
## If true, queue will be passively declared.
# queue_passive = false
## A binding between the exchange and queue using this binding key is
## created. If unset, no binding is created.
binding_key = "#"
## Maximum number of messages server should give to the worker.
Expand Down Expand Up @@ -286,59 +291,52 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err
return nil, fmt.Errorf("Failed to open a channel: %s", err)
}

var exchangeDurable = true
switch a.ExchangeDurability {
case "transient":
exchangeDurable = false
default:
exchangeDurable = true
}
if a.Exchange != "" {
var exchangeDurable = true
switch a.ExchangeDurability {
case "transient":
exchangeDurable = false
default:
exchangeDurable = true
}

exchangeArgs := make(amqp.Table, len(a.ExchangeArguments))
for k, v := range a.ExchangeArguments {
exchangeArgs[k] = v
exchangeArgs := make(amqp.Table, len(a.ExchangeArguments))
for k, v := range a.ExchangeArguments {
exchangeArgs[k] = v
}

err = declareExchange(
ch,
a.Exchange,
a.ExchangeType,
a.ExchangePassive,
exchangeDurable,
exchangeArgs)
if err != nil {
return nil, err
}
}

err = declareExchange(
q, err := declareQueue(
ch,
a.Exchange,
a.ExchangeType,
a.ExchangePassive,
exchangeDurable,
exchangeArgs)
a.Queue,
a.QueueDurability,
a.QueuePassive)
if err != nil {
return nil, err
}

var queueDurable = true
switch a.QueueDurability {
case "transient":
queueDurable = false
default:
queueDurable = true
}

q, err := ch.QueueDeclare(
a.Queue, // queue
queueDurable, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, fmt.Errorf("Failed to declare a queue: %s", err)
}

err = ch.QueueBind(
q.Name, // queue
a.BindingKey, // binding-key
a.Exchange, // exchange
false,
nil,
)
if err != nil {
return nil, fmt.Errorf("Failed to bind a queue: %s", err)
if a.BindingKey != "" {
err = ch.QueueBind(
q.Name, // queue
a.BindingKey, // binding-key
a.Exchange, // exchange
false,
nil,
)
if err != nil {
return nil, fmt.Errorf("Failed to bind a queue: %s", err)
}
}

err = ch.Qos(
Expand Down Expand Up @@ -402,6 +400,48 @@ func declareExchange(
return nil
}

func declareQueue(
channel *amqp.Channel,
queueName string,
queueDurability string,
queuePassive bool,
) (*amqp.Queue, error) {
var queue amqp.Queue
var err error

var queueDurable = true
switch queueDurability {
case "transient":
queueDurable = false
default:
queueDurable = true
}

if queuePassive {
queue, err = channel.QueueDeclarePassive(
queueName, // queue
queueDurable, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
} else {
queue, err = channel.QueueDeclare(
queueName, // queue
queueDurable, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
}
if err != nil {
return nil, fmt.Errorf("error declaring queue: %v", err)
}
return &queue, nil
}

// Read messages from queue and add them to the Accumulator
func (a *AMQPConsumer) process(ctx context.Context, msgs <-chan amqp.Delivery, ac telegraf.Accumulator) {
a.deliveries = make(map[telegraf.TrackingID]amqp.Delivery)
Expand Down
2 changes: 1 addition & 1 deletion plugins/outputs/amqp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ For an introduction to AMQP see:
# exchange_type = "topic"

## If true, exchange will be passively declared.
# exchange_declare_passive = false
# exchange_passive = false

## Exchange durability can be either "transient" or "durable".
# exchange_durability = "durable"
Expand Down
2 changes: 1 addition & 1 deletion plugins/outputs/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ var sampleConfig = `
# exchange_type = "topic"
## If true, exchange will be passively declared.
# exchange_declare_passive = false
# exchange_passive = false
## Exchange durability can be either "transient" or "durable".
# exchange_durability = "durable"
Expand Down
4 changes: 4 additions & 0 deletions plugins/outputs/amqp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ func Connect(config *ClientConfig) (*client, error) {
}

func (c *client) DeclareExchange() error {
if c.config.exchange == "" {
return nil
}

var err error
if c.config.exchangePassive {
err = c.channel.ExchangeDeclarePassive(
Expand Down

0 comments on commit e141518

Please sign in to comment.