Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewriting Riemann output plugin #1900

Merged
merged 13 commits into from
Jan 27, 2017
2 changes: 1 addition & 1 deletion Godeps
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
github.com/Shopify/sarama 8aadb476e66ca998f2f6bb3c993e9a2daa3666b9
github.com/Sirupsen/logrus 219c8cb75c258c552e999735be6df753ffc7afdc
github.com/aerospike/aerospike-client-go 7f3a312c3b2a60ac083ec6da296091c52c795c63
github.com/amir/raidman 53c1b967405155bfc8758557863bf2e14f814687
github.com/amir/raidman c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985
github.com/aws/aws-sdk-go 13a12060f716145019378a10e2806c174356b857
github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4
github.com/cenkalti/backoff 4dc77674aceaabba2c7e3da25d4c823edfb73f99
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ docker-run:
docker run --name redis -p "6379:6379" -d redis
docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
docker run --name riemann -p "5555:5555" -d blalor/riemann
docker run --name riemann -p "5555:5555" -d stealthly/docker-riemann
docker run --name nats -p "4222:4222" -d nats

# Run docker containers necessary for CircleCI unit tests
Expand All @@ -71,7 +71,7 @@ docker-run-circle:
-d spotify/kafka
docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
docker run --name riemann -p "5555:5555" -d blalor/riemann
docker run --name riemann -p "5555:5555" -d stealthly/docker-riemann
docker run --name nats -p "4222:4222" -d nats

# Kill all docker containers, ignore errors
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ want to add support for another service or third-party API.
* [opentsdb](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/opentsdb)
* [prometheus](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/prometheus_client)
* [riemann](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/riemann)
* [riemann_legacy](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/riemann_legacy)

## Contributing

Expand Down
36 changes: 35 additions & 1 deletion etc/telegraf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,42 @@
# # listen = ":9126"


# # Configuration for the Riemann server to send metrics to
# # Configuration for Riemann server to send metrics to
# [[outputs.riemann]]
# ## Address of the Riemann server
# address = "localhost:5555"
#
# ## Transport protocol to use, either tcp or udp
# transport = "tcp"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you just make the scheme part of the address string?

ie:

  address = "tcp://localhost:5555"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could, but then I'd have to split/parse it to take it apart again anyway, since the Riemann library needs the protocol and address as separate parameters. address is not used anywhere else otherwise. It's not a complicated thing to do, but I don't see the benefit of it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two benefits:

  1. we can use url.Parse to validate the provided URL
  2. it's more consistent with the configuration of other plugins

#
# ## Riemann TTL, floating-point time in seconds.
# ## Defines how long that an event is considered valid for in Riemann
# # ttl = 30.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is a "Riemann TTL"? Can you link to documentation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant a Riemann event TTL. It's described here: http://riemann.io/concepts.html
I clarified the description text a bit.

#
# ## Separator to use between measurement and field name in Riemann service name
# separator = "/"
#
# ## Set measurement name as a Riemann attribute,
# ## instead of prepending it to the Riemann service name
# # measurement_as_attribute = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide more details on what this means? can you provide an example? How does using this option affect the separator option?

Copy link
Contributor Author

@JamesClonk JamesClonk Jan 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to clarify the description text a bit. I also added an example Riemann event with this setting to the plugins Readme.md.

#
# ## Send string metrics as Riemann event states.
# ## Unless enabled all string metrics will be ignored
# # string_as_state = false
#
# ## A list of tag keys whose values get sent as Riemann tags.
# ## If empty, all Telegraf tag values will be sent as tags
# # tag_keys = ["telegraf","custom_tag"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weren't we going to send tags as riemann "attributes" instead of as tags? is that what you mean by "Riemann tags"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Telegraf/Influx/Metrics2.0 tags will be sent as Riemann attributes, which are key/value pairs. But Riemann also has a notion of "Riemann tags" which are just single string thingies. (Kinda like in Graphite I guess?)
This option allows setting such additional tags.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, fair enough

#
# ## Additional Riemann tags to send.
# # tags = ["telegraf-output"]
#
# ## Description for Riemann event
# # description_text = "metrics collected from telegraf"


# # Configuration for the legacy Riemann plugin
# [[outputs.riemann_legacy]]
# ## URL of server
# url = "localhost:5555"
# ## transport protocol to use either tcp or udp
Expand Down
1 change: 1 addition & 0 deletions plugins/outputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ import (
_ "github.com/influxdata/telegraf/plugins/outputs/opentsdb"
_ "github.com/influxdata/telegraf/plugins/outputs/prometheus_client"
_ "github.com/influxdata/telegraf/plugins/outputs/riemann"
_ "github.com/influxdata/telegraf/plugins/outputs/riemann_legacy"
)
55 changes: 55 additions & 0 deletions plugins/outputs/riemann/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Riemann Output Plugin

This plugin writes to [Riemann](http://riemann.io/) via TCP or UDP.

### Configuration:

```toml
# Configuration for Riemann to send metrics to
[[outputs.riemann]]
## Address of the Riemann server
address = "localhost:5555"

## Transport protocol to use, either tcp or udp
transport = "tcp"

## Riemann TTL, floating-point time in seconds.
## Defines how long that an event is considered valid for in Riemann
# ttl = 30.0

## Separator to use between measurement and field name in Riemann service name
separator = "/"

## Set measurement name as a Riemann attribute,
## instead of prepending it to the Riemann service name
# measurement_as_attribute = false

## Send string metrics as Riemann event states.
## Unless enabled all string metrics will be ignored
# string_as_state = false

## A list of tag keys whose values get sent as Riemann tags.
## If empty, all Telegraf tag values will be sent as tags
# tag_keys = ["telegraf","custom_tag"]

## Additional Riemann tags to send.
# tags = ["telegraf-output"]

## Description for Riemann event
# description_text = "metrics collected from telegraf"
```

### Required parameters:

* `address`: Address of the Riemann server to send Riemann events to.
* `transport`: Transport protocol to use, must be either tcp or udp.

### Optional parameters:

* `ttl`: Riemann event TTL, floating-point time in seconds. Defines how long that an event is considered valid for in Riemann.
* `separator`: Separator to use between measurement and field name in Riemann service name.
* `measurement_as_attribute`: Set measurement name as a Riemann attribute, instead of prepending it to the Riemann service name.
* `string_as_state`: Send string metrics as Riemann event states. If this is not enabled then all string metrics will be ignored.
* `tag_keys`: A list of tag keys whose values get sent as Riemann tags. If empty, all Telegraf tag values will be sent as tags.
* `tags`: Additional Riemann tags that will be sent.
* `description_text`: Description text for Riemann event.
174 changes: 117 additions & 57 deletions plugins/outputs/riemann/riemann.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,44 +12,70 @@ import (
"github.com/influxdata/telegraf/plugins/outputs"
)

const deprecationMsg = "I! WARNING: this Riemann output plugin will be deprecated in a future release, see https://github.com/influxdata/telegraf/issues/1878 for more details & discussion."

type Riemann struct {
URL string
Transport string
Separator string
Address string
Transport string
TTL float32
Separator string
MeasurementAsAttribute bool
StringAsState bool
TagKeys []string
Tags []string
DescriptionText string

client *raidman.Client
}

var sampleConfig = `
## URL of server
url = "localhost:5555"
## transport protocol to use either tcp or udp
## Address of the Riemann server
address = "localhost:5555"

## Transport protocol to use, either tcp or udp
transport = "tcp"
## separator to use between input name and field name in Riemann service name
separator = " "

## Riemann TTL, floating-point time in seconds.
## Defines how long that an event is considered valid for in Riemann
# ttl = 30.0

## Separator to use between measurement and field name in Riemann service name
separator = "/"

## Set measurement name as a Riemann attribute,
## instead of prepending it to the Riemann service name
# measurement_as_attribute = false

## Send string metrics as Riemann event states.
## Unless enabled all string metrics will be ignored
# string_as_state = false

## A list of tag keys whose values get sent as Riemann tags.
## If empty, all Telegraf tag values will be sent as tags
# tag_keys = ["telegraf","custom_tag"]

## Additional Riemann tags to send.
# tags = ["telegraf-output"]

## Description for Riemann event
# description_text = "metrics collected from telegraf"
`

func (r *Riemann) Connect() error {
log.Printf(deprecationMsg)
c, err := raidman.Dial(r.Transport, r.URL)
client, err := raidman.Dial(r.Transport, r.Address)

if err != nil {
r.client = nil
return err
}

r.client = c
r.client = client
return nil
}

func (r *Riemann) Close() error {
if r.client == nil {
return nil
if r.client != nil {
r.client.Close()
r.client = nil
}
r.client.Close()
r.client = nil
return nil
}

Expand All @@ -62,91 +88,125 @@ func (r *Riemann) Description() string {
}

func (r *Riemann) Write(metrics []telegraf.Metric) error {
log.Printf(deprecationMsg)
if len(metrics) == 0 {
return nil
}

if r.client == nil {
err := r.Connect()
if err != nil {
return fmt.Errorf("FAILED to (re)connect to Riemann. Error: %s\n", err)
if err := r.Connect(); err != nil {
return fmt.Errorf("Failed to (re)connect to Riemann: %s", err.Error())
}
}

// build list of Riemann events to send
var events []*raidman.Event
for _, p := range metrics {
evs := buildEvents(p, r.Separator)
for _, m := range metrics {
evs := r.buildRiemannEvents(m)
for _, ev := range evs {
events = append(events, ev)
}
}

var senderr = r.client.SendMulti(events)
if senderr != nil {
r.Close() // always retuns nil
return fmt.Errorf("FAILED to send riemann message (will try to reconnect). Error: %s\n",
senderr)
if err := r.client.SendMulti(events); err != nil {
r.Close()
return fmt.Errorf("Failed to send riemann message: %s", err)
}

return nil
}

func buildEvents(p telegraf.Metric, s string) []*raidman.Event {
func (r *Riemann) buildRiemannEvents(m telegraf.Metric) []*raidman.Event {
events := []*raidman.Event{}
for fieldName, value := range p.Fields() {
host, ok := p.Tags()["host"]
for fieldName, value := range m.Fields() {
// get host for Riemann event
host, ok := m.Tags()["host"]
if !ok {
hostname, err := os.Hostname()
if err != nil {
host = "unknown"
} else {
if hostname, err := os.Hostname(); err == nil {
host = hostname
} else {
host = "unknown"
}
}

event := &raidman.Event{
Host: host,
Service: serviceName(s, p.Name(), p.Tags(), fieldName),
Host: host,
Ttl: r.TTL,
Description: r.DescriptionText,
Time: m.Time().Unix(),

Attributes: r.attributes(m.Name(), m.Tags()),
Service: r.service(m.Name(), fieldName),
Tags: r.tags(m.Tags()),
}

switch value.(type) {
case string:
// only send string metrics if explicitly enabled, skip otherwise
if !r.StringAsState {
log.Printf("D! Riemann event states disabled, skipping metric value [%s]\n", value)
continue
}
event.State = value.(string)
default:
case int, int64, uint64, float32, float64:
event.Metric = value
default:
log.Printf("D! Riemann does not support metric value [%s]\n", value)
continue
}

events = append(events, event)
}

return events
}

func serviceName(s string, n string, t map[string]string, f string) string {
serviceStrings := []string{}
serviceStrings = append(serviceStrings, n)
func (r *Riemann) attributes(name string, tags map[string]string) map[string]string {
if r.MeasurementAsAttribute {
tags["measurement"] = name
}

delete(tags, "host") // exclude 'host' tag
return tags
}

// we'll skip the 'host' tag
tagStrings := []string{}
tagNames := []string{}
func (r *Riemann) service(name string, field string) string {
var serviceStrings []string

for tagName := range t {
tagNames = append(tagNames, tagName)
// if measurement is not enabled as an attribute then prepend it to service name
if !r.MeasurementAsAttribute {
serviceStrings = append(serviceStrings, name)
}
sort.Strings(tagNames)
serviceStrings = append(serviceStrings, field)

for _, tagName := range tagNames {
if tagName != "host" {
tagStrings = append(tagStrings, t[tagName])
return strings.Join(serviceStrings, r.Separator)
}

func (r *Riemann) tags(tags map[string]string) []string {
// always add specified Riemann tags
values := r.Tags

// if tag_keys are specified, add those and return tag list
if len(r.TagKeys) > 0 {
for _, tagName := range r.TagKeys {
value, ok := tags[tagName]
if ok {
values = append(values, value)
}
}
return values
}
var tagString string = strings.Join(tagStrings, s)
if tagString != "" {
serviceStrings = append(serviceStrings, tagString)

// otherwise add all values from telegraf tag key/value pairs
var keys []string
for key := range tags {
keys = append(keys, key)
}
sort.Strings(keys)

for _, key := range keys {
if key != "host" { // exclude 'host' tag
values = append(values, tags[key])
}
}
serviceStrings = append(serviceStrings, f)
return strings.Join(serviceStrings, s)
return values
}

func init() {
Expand Down
Loading