Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support passive queue declaration in amqp_consumer #5831

Merged
merged 4 commits into from
May 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions plugins/inputs/amqp_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ The following defaults are known to work with RabbitMQ:
# username = ""
# password = ""

## Exchange to declare and consume from.
## Name of the exchange to declare. If unset, no exchange will be declared.
exchange = "telegraf"

## Exchange type; common types are "direct", "fanout", "topic", "header", "x-consistent-hash".
Expand All @@ -49,7 +49,11 @@ The following defaults are known to work with RabbitMQ:
## AMQP queue durability can be "transient" or "durable".
queue_durability = "durable"

## Binding Key
## If true, queue will be passively declared.
# queue_passive = false

## A binding between the exchange and queue using this binding key is
## created. If unset, no binding is created.
binding_key = "#"

## Maximum number of messages server should give to the worker.
Expand Down
134 changes: 87 additions & 47 deletions plugins/inputs/amqp_consumer/amqp_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type AMQPConsumer struct {
// Queue Name
Queue string `toml:"queue"`
QueueDurability string `toml:"queue_durability"`
QueuePassive bool `toml:"queue_passive"`

// Binding Key
BindingKey string `toml:"binding_key"`
Expand Down Expand Up @@ -97,7 +98,7 @@ func (a *AMQPConsumer) SampleConfig() string {
# username = ""
# password = ""

## Exchange to declare and consume from.
## Name of the exchange to declare. If unset, no exchange will be declared.
exchange = "telegraf"

## Exchange type; common types are "direct", "fanout", "topic", "header", "x-consistent-hash".
Expand All @@ -119,7 +120,11 @@ func (a *AMQPConsumer) SampleConfig() string {
## AMQP queue durability can be "transient" or "durable".
queue_durability = "durable"

## Binding Key.
## If true, queue will be passively declared.
# queue_passive = false

## A binding between the exchange and queue using this binding key is
## created. If unset, no binding is created.
binding_key = "#"

## Maximum number of messages server should give to the worker.
Expand Down Expand Up @@ -273,59 +278,52 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err
return nil, fmt.Errorf("Failed to open a channel: %s", err)
}

var exchangeDurable = true
switch a.ExchangeDurability {
case "transient":
exchangeDurable = false
default:
exchangeDurable = true
}
if a.Exchange != "" {
var exchangeDurable = true
switch a.ExchangeDurability {
case "transient":
exchangeDurable = false
default:
exchangeDurable = true
}

exchangeArgs := make(amqp.Table, len(a.ExchangeArguments))
for k, v := range a.ExchangeArguments {
exchangeArgs[k] = v
exchangeArgs := make(amqp.Table, len(a.ExchangeArguments))
for k, v := range a.ExchangeArguments {
exchangeArgs[k] = v
}

err = declareExchange(
ch,
a.Exchange,
a.ExchangeType,
a.ExchangePassive,
exchangeDurable,
exchangeArgs)
if err != nil {
return nil, err
}
}

err = declareExchange(
q, err := declareQueue(
ch,
a.Exchange,
a.ExchangeType,
a.ExchangePassive,
exchangeDurable,
exchangeArgs)
a.Queue,
a.QueueDurability,
a.QueuePassive)
if err != nil {
return nil, err
}

var queueDurable = true
switch a.QueueDurability {
case "transient":
queueDurable = false
default:
queueDurable = true
}

q, err := ch.QueueDeclare(
a.Queue, // queue
queueDurable, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, fmt.Errorf("Failed to declare a queue: %s", err)
}

err = ch.QueueBind(
q.Name, // queue
a.BindingKey, // binding-key
a.Exchange, // exchange
false,
nil,
)
if err != nil {
return nil, fmt.Errorf("Failed to bind a queue: %s", err)
if a.BindingKey != "" {
err = ch.QueueBind(
q.Name, // queue
a.BindingKey, // binding-key
a.Exchange, // exchange
false,
nil,
)
if err != nil {
return nil, fmt.Errorf("Failed to bind a queue: %s", err)
}
}

err = ch.Qos(
Expand Down Expand Up @@ -389,6 +387,48 @@ func declareExchange(
return nil
}

func declareQueue(
channel *amqp.Channel,
queueName string,
queueDurability string,
queuePassive bool,
) (*amqp.Queue, error) {
var queue amqp.Queue
var err error

var queueDurable = true
switch queueDurability {
case "transient":
queueDurable = false
default:
queueDurable = true
}

if queuePassive {
queue, err = channel.QueueDeclarePassive(
queueName, // queue
queueDurable, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
} else {
queue, err = channel.QueueDeclare(
queueName, // queue
queueDurable, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
}
if err != nil {
return nil, fmt.Errorf("error declaring queue: %v", err)
}
return &queue, nil
}

// Read messages from queue and add them to the Accumulator
func (a *AMQPConsumer) process(ctx context.Context, msgs <-chan amqp.Delivery, ac telegraf.Accumulator) {
a.deliveries = make(map[telegraf.TrackingID]amqp.Delivery)
Expand Down
2 changes: 1 addition & 1 deletion plugins/outputs/amqp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ For an introduction to AMQP see:
# exchange_type = "topic"

## If true, exchange will be passively declared.
# exchange_declare_passive = false
# exchange_passive = false

## Exchange durability can be either "transient" or "durable".
# exchange_durability = "durable"
Expand Down
3 changes: 1 addition & 2 deletions plugins/outputs/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"

"github.com/streadway/amqp"
)

Expand Down Expand Up @@ -91,7 +90,7 @@ var sampleConfig = `
# exchange_type = "topic"

## If true, exchange will be passively declared.
# exchange_declare_passive = false
# exchange_passive = false

## Exchange durability can be either "transient" or "durable".
# exchange_durability = "durable"
Expand Down
4 changes: 4 additions & 0 deletions plugins/outputs/amqp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func Connect(config *ClientConfig) (*client, error) {
}

func (c *client) DeclareExchange() error {
if c.config.exchange == "" {
return nil
}

var err error
if c.config.exchangePassive {
err = c.channel.ExchangeDeclarePassive(
Expand Down