From e141518cf05177ef0f6a8efb3608ec0c1acbf49f Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Mon, 20 May 2019 14:38:35 -0700 Subject: [PATCH] Support passive queue declaration in amqp_consumer (#5831) --- plugins/inputs/amqp_consumer/README.md | 8 +- plugins/inputs/amqp_consumer/amqp_consumer.go | 134 ++++++++++++------ plugins/outputs/amqp/README.md | 2 +- plugins/outputs/amqp/amqp.go | 2 +- plugins/outputs/amqp/client.go | 4 + 5 files changed, 99 insertions(+), 51 deletions(-) diff --git a/plugins/inputs/amqp_consumer/README.md b/plugins/inputs/amqp_consumer/README.md index 84371ba4d9546..53fca513dd6df 100644 --- a/plugins/inputs/amqp_consumer/README.md +++ b/plugins/inputs/amqp_consumer/README.md @@ -27,7 +27,7 @@ The following defaults are known to work with RabbitMQ: # username = "" # password = "" - ## Exchange to declare and consume from. + ## Name of the exchange to declare. If unset, no exchange will be declared. exchange = "telegraf" ## Exchange type; common types are "direct", "fanout", "topic", "header", "x-consistent-hash". @@ -49,7 +49,11 @@ The following defaults are known to work with RabbitMQ: ## AMQP queue durability can be "transient" or "durable". queue_durability = "durable" - ## Binding Key + ## If true, queue will be passively declared. + # queue_passive = false + + ## A binding between the exchange and queue using this binding key is + ## created. If unset, no binding is created. binding_key = "#" ## Maximum number of messages server should give to the worker. diff --git a/plugins/inputs/amqp_consumer/amqp_consumer.go b/plugins/inputs/amqp_consumer/amqp_consumer.go index 994a3736a391c..6cf6004f54596 100644 --- a/plugins/inputs/amqp_consumer/amqp_consumer.go +++ b/plugins/inputs/amqp_consumer/amqp_consumer.go @@ -41,6 +41,7 @@ type AMQPConsumer struct { // Queue Name Queue string `toml:"queue"` QueueDurability string `toml:"queue_durability"` + QueuePassive bool `toml:"queue_passive"` // Binding Key BindingKey string `toml:"binding_key"` @@ -101,7 +102,7 @@ func (a *AMQPConsumer) SampleConfig() string { # username = "" # password = "" - ## Exchange to declare and consume from. + ## Name of the exchange to declare. If unset, no exchange will be declared. exchange = "telegraf" ## Exchange type; common types are "direct", "fanout", "topic", "header", "x-consistent-hash". @@ -123,7 +124,11 @@ func (a *AMQPConsumer) SampleConfig() string { ## AMQP queue durability can be "transient" or "durable". queue_durability = "durable" - ## Binding Key. + ## If true, queue will be passively declared. + # queue_passive = false + + ## A binding between the exchange and queue using this binding key is + ## created. If unset, no binding is created. binding_key = "#" ## Maximum number of messages server should give to the worker. @@ -286,59 +291,52 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err return nil, fmt.Errorf("Failed to open a channel: %s", err) } - var exchangeDurable = true - switch a.ExchangeDurability { - case "transient": - exchangeDurable = false - default: - exchangeDurable = true - } + if a.Exchange != "" { + var exchangeDurable = true + switch a.ExchangeDurability { + case "transient": + exchangeDurable = false + default: + exchangeDurable = true + } - exchangeArgs := make(amqp.Table, len(a.ExchangeArguments)) - for k, v := range a.ExchangeArguments { - exchangeArgs[k] = v + exchangeArgs := make(amqp.Table, len(a.ExchangeArguments)) + for k, v := range a.ExchangeArguments { + exchangeArgs[k] = v + } + + err = declareExchange( + ch, + a.Exchange, + a.ExchangeType, + a.ExchangePassive, + exchangeDurable, + exchangeArgs) + if err != nil { + return nil, err + } } - err = declareExchange( + q, err := declareQueue( ch, - a.Exchange, - a.ExchangeType, - a.ExchangePassive, - exchangeDurable, - exchangeArgs) + a.Queue, + a.QueueDurability, + a.QueuePassive) if err != nil { return nil, err } - var queueDurable = true - switch a.QueueDurability { - case "transient": - queueDurable = false - default: - queueDurable = true - } - - q, err := ch.QueueDeclare( - a.Queue, // queue - queueDurable, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments - ) - if err != nil { - return nil, fmt.Errorf("Failed to declare a queue: %s", err) - } - - err = ch.QueueBind( - q.Name, // queue - a.BindingKey, // binding-key - a.Exchange, // exchange - false, - nil, - ) - if err != nil { - return nil, fmt.Errorf("Failed to bind a queue: %s", err) + if a.BindingKey != "" { + err = ch.QueueBind( + q.Name, // queue + a.BindingKey, // binding-key + a.Exchange, // exchange + false, + nil, + ) + if err != nil { + return nil, fmt.Errorf("Failed to bind a queue: %s", err) + } } err = ch.Qos( @@ -402,6 +400,48 @@ func declareExchange( return nil } +func declareQueue( + channel *amqp.Channel, + queueName string, + queueDurability string, + queuePassive bool, +) (*amqp.Queue, error) { + var queue amqp.Queue + var err error + + var queueDurable = true + switch queueDurability { + case "transient": + queueDurable = false + default: + queueDurable = true + } + + if queuePassive { + queue, err = channel.QueueDeclarePassive( + queueName, // queue + queueDurable, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + } else { + queue, err = channel.QueueDeclare( + queueName, // queue + queueDurable, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + } + if err != nil { + return nil, fmt.Errorf("error declaring queue: %v", err) + } + return &queue, nil +} + // Read messages from queue and add them to the Accumulator func (a *AMQPConsumer) process(ctx context.Context, msgs <-chan amqp.Delivery, ac telegraf.Accumulator) { a.deliveries = make(map[telegraf.TrackingID]amqp.Delivery) diff --git a/plugins/outputs/amqp/README.md b/plugins/outputs/amqp/README.md index 68470a2c082a6..f810a0a7b239b 100644 --- a/plugins/outputs/amqp/README.md +++ b/plugins/outputs/amqp/README.md @@ -33,7 +33,7 @@ For an introduction to AMQP see: # exchange_type = "topic" ## If true, exchange will be passively declared. - # exchange_declare_passive = false + # exchange_passive = false ## Exchange durability can be either "transient" or "durable". # exchange_durability = "durable" diff --git a/plugins/outputs/amqp/amqp.go b/plugins/outputs/amqp/amqp.go index 4350f2e74233a..f82faef64d29e 100644 --- a/plugins/outputs/amqp/amqp.go +++ b/plugins/outputs/amqp/amqp.go @@ -92,7 +92,7 @@ var sampleConfig = ` # exchange_type = "topic" ## If true, exchange will be passively declared. - # exchange_declare_passive = false + # exchange_passive = false ## Exchange durability can be either "transient" or "durable". # exchange_durability = "durable" diff --git a/plugins/outputs/amqp/client.go b/plugins/outputs/amqp/client.go index 5e0dc3b49d2fb..8c230b706b09a 100644 --- a/plugins/outputs/amqp/client.go +++ b/plugins/outputs/amqp/client.go @@ -78,6 +78,10 @@ func Connect(config *ClientConfig) (*client, error) { } func (c *client) DeclareExchange() error { + if c.config.exchange == "" { + return nil + } + var err error if c.config.exchangePassive { err = c.channel.ExchangeDeclarePassive(