Skip to content

Commit

Permalink
MQTT Consumer Input plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
sparrc committed Feb 12, 2016
1 parent 6c353e8 commit 8cefba9
Show file tree
Hide file tree
Showing 14 changed files with 550 additions and 43 deletions.
3 changes: 1 addition & 2 deletions Godeps
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git dbd8d5c40a582eb9adacde36b47932b3a3ad0034
git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git 617c801af238c3af2d9e72c5d4a0f02edad03ce5
github.com/Shopify/sarama d37c73f2b2bce85f7fa16b6a550d26c5372892ef
github.com/Sirupsen/logrus f7f79f729e0fbe2fcc061db48a9ba0263f588252
github.com/amir/raidman 6a8e089bbe32e6b907feae5ba688841974b3c339
Expand Down Expand Up @@ -30,7 +30,6 @@ github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b
github.com/naoina/toml 751171607256bb66e64c9f0220c00662420c38e9
github.com/nats-io/nats 6a83f1a633cfbfd90aa648ac99fb38c06a8b40df
github.com/nsqio/go-nsq 2118015c120962edc5d03325c680daf3163a8b5f
github.com/pborman/uuid dee7705ef7b324f27ceb85a121c61f2c2e8ce988
github.com/pmezard/go-difflib 792786c7400a136282c1664665ae0a8db921c6c2
github.com/prometheus/client_golang 67994f177195311c3ea3d4407ed0175e34a4256f
github.com/prometheus/client_model fa8ad6fec33561be4280a8f0514318c79d7f6cb6
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/EXAMPLE_README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ The example plugin gathers metrics about example things

### Configuration:

```
```toml
# Description
[[inputs.example]]
# SampleConfig
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/mailchimp"
_ "github.com/influxdata/telegraf/plugins/inputs/memcached"
_ "github.com/influxdata/telegraf/plugins/inputs/mongodb"
_ "github.com/influxdata/telegraf/plugins/inputs/mqtt_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/mysql"
_ "github.com/influxdata/telegraf/plugins/inputs/nats_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/nginx"
Expand Down
31 changes: 24 additions & 7 deletions plugins/inputs/kafka_consumer/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,34 @@
# Kafka Consumer
# Kafka Consumer Input Plugin

The [Kafka](http://kafka.apache.org/) consumer plugin polls a specified Kafka
topic and adds messages to InfluxDB. The plugin assumes messages follow the
line protocol. [Consumer Group](http://godoc.org/github.com/wvanbergen/kafka/consumergroup)
is used to talk to the Kafka cluster so multiple instances of telegraf can read
from the same topic in parallel.

## Configuration

```toml
# Read metrics from Kafka topic(s)
[[inputs.kafka_consumer]]
### topic(s) to consume
topics = ["telegraf"]
### an array of Zookeeper connection strings
zookeeper_peers = ["localhost:2181"]
### the name of the consumer group
consumer_group = "telegraf_metrics_consumers"
### Maximum number of metrics to buffer between collection intervals
metric_buffer = 100000
### Offset (must be either "oldest" or "newest")
offset = "oldest"

### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md
data_format = "influx"
```

## Testing

Running integration tests requires running Zookeeper & Kafka. The following
Expand All @@ -16,9 +39,3 @@ To start Kafka & Zookeeper:
```
docker run -d -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`boot2docker ip || docker-machine ip <your_machine_name>` --env ADVERTISED_PORT=9092 spotify/kafka
```

To run tests:

```
go test
```
33 changes: 19 additions & 14 deletions plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ type Kafka struct {
Topics []string
ZookeeperPeers []string
Consumer *consumergroup.ConsumerGroup
PointBuffer int
Offset string
MetricBuffer int
// TODO remove PointBuffer, legacy support
PointBuffer int
Offset string

parser parsers.Parser

Expand All @@ -30,7 +32,7 @@ type Kafka struct {
in <-chan *sarama.ConsumerMessage
// channel for all kafka consumer errors
errs <-chan *sarama.ConsumerError
// channel for all incoming parsed kafka points
// channel for all incoming parsed kafka metrics
metricC chan telegraf.Metric
done chan struct{}

Expand All @@ -46,8 +48,8 @@ var sampleConfig = `
zookeeper_peers = ["localhost:2181"]
### the name of the consumer group
consumer_group = "telegraf_metrics_consumers"
### Maximum number of points to buffer between collection intervals
point_buffer = 100000
### Maximum number of metrics to buffer between collection intervals
metric_buffer = 100000
### Offset (must be either "oldest" or "newest")
offset = "oldest"
Expand Down Expand Up @@ -104,10 +106,13 @@ func (k *Kafka) Start() error {
}

k.done = make(chan struct{})
if k.PointBuffer == 0 {
k.PointBuffer = 100000
if k.PointBuffer == 0 && k.MetricBuffer == 0 {
k.MetricBuffer = 100000
} else if k.PointBuffer > 0 {
// Legacy support of PointBuffer field TODO remove
k.MetricBuffer = k.PointBuffer
}
k.metricC = make(chan telegraf.Metric, k.PointBuffer)
k.metricC = make(chan telegraf.Metric, k.MetricBuffer)

// Start the kafka message reader
go k.receiver()
Expand All @@ -128,7 +133,7 @@ func (k *Kafka) receiver() {
case msg := <-k.in:
metrics, err := k.parser.Parse(msg.Value)
if err != nil {
log.Printf("Could not parse kafka message: %s, error: %s",
log.Printf("KAFKA PARSE ERROR\nmessage: %s\nerror: %s",
string(msg.Value), err.Error())
}

Expand All @@ -139,7 +144,7 @@ func (k *Kafka) receiver() {
continue
default:
log.Printf("Kafka Consumer buffer is full, dropping a metric." +
" You may want to increase the point_buffer setting")
" You may want to increase the metric_buffer setting")
}
}

Expand All @@ -166,10 +171,10 @@ func (k *Kafka) Stop() {
func (k *Kafka) Gather(acc telegraf.Accumulator) error {
k.Lock()
defer k.Unlock()
npoints := len(k.metricC)
for i := 0; i < npoints; i++ {
point := <-k.metricC
acc.AddFields(point.Name(), point.Fields(), point.Tags(), point.Time())
nmetrics := len(k.metricC)
for i := 0; i < nmetrics; i++ {
metric := <-k.metricC
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
return nil
}
Expand Down
48 changes: 48 additions & 0 deletions plugins/inputs/mqtt_consumer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# MQTT Consumer Input Plugin

The [MQTT](http://mqtt.org/) consumer plugin reads from
specified MQTT topics and adds messages to InfluxDB.
The plugin expects messages in the
[Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md).

### Configuration:

```toml
# Read metrics from MQTT topic(s)
[[inputs.mqtt_consumer]]
servers = ["localhost:1883"]
### MQTT QoS, must be 0, 1, or 2
qos = 0

### Topics to subscribe to
topics = [
"telegraf/host01/cpu",
"telegraf/+/mem",
"sensors/#",
]

### Maximum number of metrics to buffer between collection intervals
metric_buffer = 100000

### username and password to connect MQTT server.
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"

### Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
### Use SSL but skip chain & host verification
# insecure_skip_verify = false

### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md
data_format = "influx"
```

### Tags:

- All measurements are tagged with the incoming topic, ie
`topic=telegraf/host01/cpu`
Loading

0 comments on commit 8cefba9

Please sign in to comment.