Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] MQTT Consumer Input plugin #676

Merged
merged 1 commit into from
Feb 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
## v0.10.3 [unreleased]

### Release Notes
- Users of the `exec` and `kafka_consumer` can now specify the incoming data
- Users of the `exec` and `kafka_consumer` (and the new `nats_consumer`
and `mqtt_consumer` plugins) can now specify the incoming data
format that they would like to parse. Currently supports: "json", "influx", and
"graphite"
- More info on parsing arbitrary data formats can be found
Expand All @@ -12,6 +13,7 @@ format that they would like to parse. Currently supports: "json", "influx", and
- [#655](https://github.com/influxdata/telegraf/pull/655): Support parsing arbitrary data formats. Currently limited to kafka_consumer and exec inputs.
- [#671](https://github.com/influxdata/telegraf/pull/671): Dovecot input plugin. Thanks @mikif70!
- [#680](https://github.com/influxdata/telegraf/pull/680): NATS consumer input plugin. Thanks @netixen!
- [#676](https://github.com/influxdata/telegraf/pull/676): MQTT consumer input plugin.

### Bugfixes
- [#443](https://github.com/influxdata/telegraf/issues/443): Fix Ping command timeout parameter on Linux.
Expand Down
3 changes: 1 addition & 2 deletions Godeps
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git dbd8d5c40a582eb9adacde36b47932b3a3ad0034
git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git 617c801af238c3af2d9e72c5d4a0f02edad03ce5
github.com/Shopify/sarama d37c73f2b2bce85f7fa16b6a550d26c5372892ef
github.com/Sirupsen/logrus f7f79f729e0fbe2fcc061db48a9ba0263f588252
github.com/amir/raidman 6a8e089bbe32e6b907feae5ba688841974b3c339
Expand Down Expand Up @@ -30,7 +30,6 @@ github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b
github.com/naoina/toml 751171607256bb66e64c9f0220c00662420c38e9
github.com/nats-io/nats 6a83f1a633cfbfd90aa648ac99fb38c06a8b40df
github.com/nsqio/go-nsq 2118015c120962edc5d03325c680daf3163a8b5f
github.com/pborman/uuid dee7705ef7b324f27ceb85a121c61f2c2e8ce988
github.com/pmezard/go-difflib 792786c7400a136282c1664665ae0a8db921c6c2
github.com/prometheus/client_golang 67994f177195311c3ea3d4407ed0175e34a4256f
github.com/prometheus/client_model fa8ad6fec33561be4280a8f0514318c79d7f6cb6
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ Currently implemented sources:
Telegraf can also collect metrics via the following service plugins:

* statsd
* mqtt_consumer
* kafka_consumer
* nats_consumer
* github_webhooks
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/EXAMPLE_README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ The example plugin gathers metrics about example things

### Configuration:

```
```toml
# Description
[[inputs.example]]
# SampleConfig
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/mailchimp"
_ "github.com/influxdata/telegraf/plugins/inputs/memcached"
_ "github.com/influxdata/telegraf/plugins/inputs/mongodb"
_ "github.com/influxdata/telegraf/plugins/inputs/mqtt_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/mysql"
_ "github.com/influxdata/telegraf/plugins/inputs/nats_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/nginx"
Expand Down
31 changes: 24 additions & 7 deletions plugins/inputs/kafka_consumer/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,34 @@
# Kafka Consumer
# Kafka Consumer Input Plugin

The [Kafka](http://kafka.apache.org/) consumer plugin polls a specified Kafka
topic and adds messages to InfluxDB. The plugin assumes messages follow the
line protocol. [Consumer Group](http://godoc.org/github.com/wvanbergen/kafka/consumergroup)
is used to talk to the Kafka cluster so multiple instances of telegraf can read
from the same topic in parallel.

## Configuration

```toml
# Read metrics from Kafka topic(s)
[[inputs.kafka_consumer]]
### topic(s) to consume
topics = ["telegraf"]
### an array of Zookeeper connection strings
zookeeper_peers = ["localhost:2181"]
### the name of the consumer group
consumer_group = "telegraf_metrics_consumers"
### Maximum number of metrics to buffer between collection intervals
metric_buffer = 100000
### Offset (must be either "oldest" or "newest")
offset = "oldest"

### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md
data_format = "influx"
```

## Testing

Running integration tests requires running Zookeeper & Kafka. The following
Expand All @@ -16,9 +39,3 @@ To start Kafka & Zookeeper:
```
docker run -d -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`boot2docker ip || docker-machine ip <your_machine_name>` --env ADVERTISED_PORT=9092 spotify/kafka
```

To run tests:

```
go test
```
33 changes: 19 additions & 14 deletions plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ type Kafka struct {
Topics []string
ZookeeperPeers []string
Consumer *consumergroup.ConsumerGroup
PointBuffer int
Offset string
MetricBuffer int
// TODO remove PointBuffer, legacy support
PointBuffer int
Offset string

parser parsers.Parser

Expand All @@ -30,7 +32,7 @@ type Kafka struct {
in <-chan *sarama.ConsumerMessage
// channel for all kafka consumer errors
errs <-chan *sarama.ConsumerError
// channel for all incoming parsed kafka points
// channel for all incoming parsed kafka metrics
metricC chan telegraf.Metric
done chan struct{}

Expand All @@ -46,8 +48,8 @@ var sampleConfig = `
zookeeper_peers = ["localhost:2181"]
### the name of the consumer group
consumer_group = "telegraf_metrics_consumers"
### Maximum number of points to buffer between collection intervals
point_buffer = 100000
### Maximum number of metrics to buffer between collection intervals
metric_buffer = 100000
### Offset (must be either "oldest" or "newest")
offset = "oldest"

Expand Down Expand Up @@ -104,10 +106,13 @@ func (k *Kafka) Start() error {
}

k.done = make(chan struct{})
if k.PointBuffer == 0 {
k.PointBuffer = 100000
if k.PointBuffer == 0 && k.MetricBuffer == 0 {
k.MetricBuffer = 100000
} else if k.PointBuffer > 0 {
// Legacy support of PointBuffer field TODO remove
k.MetricBuffer = k.PointBuffer
}
k.metricC = make(chan telegraf.Metric, k.PointBuffer)
k.metricC = make(chan telegraf.Metric, k.MetricBuffer)

// Start the kafka message reader
go k.receiver()
Expand All @@ -128,7 +133,7 @@ func (k *Kafka) receiver() {
case msg := <-k.in:
metrics, err := k.parser.Parse(msg.Value)
if err != nil {
log.Printf("Could not parse kafka message: %s, error: %s",
log.Printf("KAFKA PARSE ERROR\nmessage: %s\nerror: %s",
string(msg.Value), err.Error())
}

Expand All @@ -139,7 +144,7 @@ func (k *Kafka) receiver() {
continue
default:
log.Printf("Kafka Consumer buffer is full, dropping a metric." +
" You may want to increase the point_buffer setting")
" You may want to increase the metric_buffer setting")
}
}

Expand All @@ -166,10 +171,10 @@ func (k *Kafka) Stop() {
func (k *Kafka) Gather(acc telegraf.Accumulator) error {
k.Lock()
defer k.Unlock()
npoints := len(k.metricC)
for i := 0; i < npoints; i++ {
point := <-k.metricC
acc.AddFields(point.Name(), point.Fields(), point.Tags(), point.Time())
nmetrics := len(k.metricC)
for i := 0; i < nmetrics; i++ {
metric := <-k.metricC
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
return nil
}
Expand Down
48 changes: 48 additions & 0 deletions plugins/inputs/mqtt_consumer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# MQTT Consumer Input Plugin

The [MQTT](http://mqtt.org/) consumer plugin reads from
specified MQTT topics and adds messages to InfluxDB.
The plugin expects messages in the
[Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md).

### Configuration:

```toml
# Read metrics from MQTT topic(s)
[[inputs.mqtt_consumer]]
servers = ["localhost:1883"]
### MQTT QoS, must be 0, 1, or 2
qos = 0

### Topics to subscribe to
topics = [
"telegraf/host01/cpu",
"telegraf/+/mem",
"sensors/#",
]

### Maximum number of metrics to buffer between collection intervals
metric_buffer = 100000

### username and password to connect MQTT server.
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"

### Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
### Use SSL but skip chain & host verification
# insecure_skip_verify = false

### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md
data_format = "influx"
```

### Tags:

- All measurements are tagged with the incoming topic, ie
`topic=telegraf/host01/cpu`
Loading