Skip to content

Commit

Permalink
Kafka output producer, send telegraf metrics to Kafka brokers
Browse files Browse the repository at this point in the history
Closes #38
  • Loading branch information
sparrc committed Aug 26, 2015
1 parent 4342678 commit d1f965a
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## v0.1.7 [unreleased]

### Features
- [#38](https://github.com/influxdb/telegraf/pull/38): Kafka output sink.
- [#133](https://github.com/influxdb/telegraf/pull/133): Add plugin.Gather error logging. Thanks @nickscript0!
- [#136](https://github.com/influxdb/telegraf/issues/136): Add a -usage flag for printing usage of a single plugin.
- [#137](https://github.com/influxdb/telegraf/issues/137): Memcached: fix when a value contains a space
Expand Down
31 changes: 16 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,37 +103,38 @@ at 192.168.59.103:8086, tagging measurements with dc="denver-1". It will output
measurements at a 10s interval and will collect totalcpu & percpu data.

```
[outputs]
[outputs.influxdb]
url = "http://192.168.59.103:8086" # required.
database = "telegraf" # required.
[tags]
dc = "denver-1"
dc = "denver-1"
[agent]
interval = "10s"
interval = "10s"
# OUTPUTS
[outputs]
[outputs.influxdb]
url = "http://192.168.59.103:8086" # required.
database = "telegraf" # required.
# PLUGINS
[cpu]
percpu = true
totalcpu = true
percpu = true
totalcpu = true
```

Below is how to configure `tagpass` parameters (added in 0.1.4)

```
# Don't collect CPU data for cpu6 & cpu7
[cpu.tagdrop]
cpu = [ "cpu6", "cpu7" ]
cpu = [ "cpu6", "cpu7" ]
[disk]
[disk.tagpass]
# tagpass conditions are OR, not AND.
# If the (filesystem is ext4 or xfs) OR (the path is /opt or /home)
# then the metric passes
fstype = [ "ext4", "xfs" ]
path = [ "/opt", "/home" ]
# tagpass conditions are OR, not AND.
# If the (filesystem is ext4 or xfs) OR (the path is /opt or /home)
# then the metric passes
fstype = [ "ext4", "xfs" ]
path = [ "/opt", "/home" ]
```

## Supported Plugins
Expand Down
8 changes: 8 additions & 0 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ func (a *Agent) Connect() error {
if err != nil {
return err
}
if a.Debug {
log.Printf("Successfully connected to output: %s\n", o.name)
}
}
return nil
}
Expand Down Expand Up @@ -160,6 +163,8 @@ func (a *Agent) LoadPlugins(pluginsFilter string) ([]string, error) {
return names, nil
}

// crankParallel runs the plugins that are using the same reporting interval
// as the telegraf agent.
func (a *Agent) crankParallel() error {
points := make(chan *BatchPoints, len(a.plugins))

Expand Down Expand Up @@ -203,6 +208,7 @@ func (a *Agent) crankParallel() error {
return a.flush(&bp)
}

// crank is mostly for test purposes.
func (a *Agent) crank() error {
var bp BatchPoints

Expand All @@ -223,6 +229,8 @@ func (a *Agent) crank() error {
return a.flush(&bp)
}

// crankSeparate runs the plugins that have been configured with their own
// reporting interval.
func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) error {
ticker := time.NewTicker(plugin.config.Interval)

Expand Down
3 changes: 0 additions & 3 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,8 @@ func main() {
}

shutdown := make(chan struct{})

signals := make(chan os.Signal)

signal.Notify(signals, os.Interrupt)

go func() {
<-signals
close(shutdown)
Expand Down
2 changes: 1 addition & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ var header = `# Telegraf configuration
# debug = false
# hostname = "prod3241"
###############################################################################
# OUTPUTS #
###############################################################################
Expand All @@ -368,7 +369,6 @@ var header2 = `
###############################################################################
# PLUGINS #
###############################################################################
`

// PrintSampleConfig prints the sample config!
Expand Down
1 change: 1 addition & 0 deletions outputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ package all
import (
_ "github.com/influxdb/telegraf/outputs/datadog"
_ "github.com/influxdb/telegraf/outputs/influxdb"
_ "github.com/influxdb/telegraf/outputs/kafka"
)
18 changes: 4 additions & 14 deletions outputs/datadog/datadog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"testing"
"time"

"github.com/influxdb/telegraf/testutil"

"github.com/influxdb/influxdb/client"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -25,18 +27,6 @@ func fakeDatadog() *Datadog {
return d
}

func testData() client.BatchPoints {
var bp client.BatchPoints
bp.Time = time.Now()
bp.Tags = map[string]string{"tag1": "value1"}
bp.Points = []client.Point{
{
Fields: map[string]interface{}{"value": 1.0},
},
}
return bp
}

func TestUriOverride(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
Expand All @@ -48,7 +38,7 @@ func TestUriOverride(t *testing.T) {
d.Apikey = "123456"
err := d.Connect()
require.NoError(t, err)
err = d.Write(testData())
err = d.Write(testutil.MockBatchPoints())
require.NoError(t, err)
}

Expand All @@ -67,7 +57,7 @@ func TestBadStatusCode(t *testing.T) {
d.Apikey = "123456"
err := d.Connect()
require.NoError(t, err)
err = d.Write(testData())
err = d.Write(testutil.MockBatchPoints())
if err == nil {
t.Errorf("error expected but none returned")
} else {
Expand Down
91 changes: 91 additions & 0 deletions outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package kafka

import (
"errors"
"fmt"

"github.com/Shopify/sarama"
"github.com/influxdb/influxdb/client"
"github.com/influxdb/telegraf/outputs"
)

type Kafka struct {
// Kafka brokers to send metrics to
Brokers []string
// Kafka topic
Topic string

producer sarama.SyncProducer
}

var sampleConfig = `
# URLs of kafka brokers
brokers = ["localhost:9092"]
# Kafka topic for producer messages
topic = "telegraf"
`

func (k *Kafka) Connect() error {
producer, err := sarama.NewSyncProducer(k.Brokers, nil)
if err != nil {
return err
}
k.producer = producer
return nil
}

func (k *Kafka) Close() error {
return k.producer.Close()
}

func (k *Kafka) SampleConfig() string {
return sampleConfig
}

func (k *Kafka) Description() string {
return "Configuration for the Kafka server to send metrics to"
}

func (k *Kafka) Write(bp client.BatchPoints) error {
if len(bp.Points) == 0 {
return nil
}

for _, p := range bp.Points {
// Combine tags from Point and BatchPoints and grab the resulting
// line-protocol output string to write to Kafka
var value string
if p.Raw != "" {
value = p.Raw
} else {
for k, v := range bp.Tags {
if p.Tags == nil {
p.Tags = make(map[string]string, len(bp.Tags))
}
p.Tags[k] = v
}
value = p.MarshalString()
}

m := &sarama.ProducerMessage{
Topic: k.Topic,
Value: sarama.StringEncoder(value),
}
if h, ok := p.Tags["host"]; ok {
m.Key = sarama.StringEncoder(h)
}

_, _, err := k.producer.SendMessage(m)
if err != nil {
return errors.New(fmt.Sprintf("FAILED to send kafka message: %s\n",
err))
}
}
return nil
}

func init() {
outputs.Add("kafka", func() outputs.Output {
return &Kafka{}
})
}
28 changes: 28 additions & 0 deletions outputs/kafka/kafka_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package kafka

import (
"testing"

"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/require"
)

func TestConnectAndWrite(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

brokers := []string{testutil.GetLocalHost() + ":9092"}
k := &Kafka{
Brokers: brokers,
Topic: "Test",
}

// Verify that we can connect to the Kafka broker
err := k.Connect()
require.NoError(t, err)

// Verify that we can successfully write data to the kafka broker
err = k.Write(testutil.MockBatchPoints())
require.NoError(t, err)
}
17 changes: 17 additions & 0 deletions testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"net"
"net/url"
"os"
"time"

"github.com/influxdb/influxdb/client"
)

var localhost = "localhost"
Expand All @@ -27,3 +30,17 @@ func GetLocalHost() string {
}
return localhost
}

// MockBatchPoints returns a mock BatchPoints object for using in unit tests
// of telegraf output sinks.
func MockBatchPoints() client.BatchPoints {
var bp client.BatchPoints
bp.Time = time.Now()
bp.Tags = map[string]string{"tag1": "value1"}
bp.Points = []client.Point{
{
Fields: map[string]interface{}{"value": 1.0},
},
}
return bp
}

0 comments on commit d1f965a

Please sign in to comment.