Skip to content

Commit

Permalink
Change point_buffer to metric_buffer to conform will changes in influ…
Browse files Browse the repository at this point in the history
  • Loading branch information
netixen authored and geodimm committed Mar 10, 2016
1 parent 72ec805 commit d3884d4
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 37 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ format that they would like to parse. Currently supports: "json", "influx", and
[here](https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md)

### Features
- [#652](https://github.com/influxdata/telegraf/pull/652): CouchDB Input Plugin
- [#652](https://github.com/influxdata/telegraf/pull/652): CouchDB Input Plugin. Thanks @codehate!
- [#655](https://github.com/influxdata/telegraf/pull/655): Support parsing arbitrary data formats. Currently limited to kafka_consumer and exec inputs.
- [#671](https://github.com/influxdata/telegraf/pull/671): Dovecot input plugin.
- [#671](https://github.com/influxdata/telegraf/pull/671): Dovecot input plugin. Thanks @mikif70!
- [#680](https://github.com/influxdata/telegraf/pull/680): NATS consumer input plugin. Thanks @netixen!

### Bugfixes
- [#443](https://github.com/influxdata/telegraf/issues/443): Fix Ping command timeout parameter on Linux.
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ Telegraf can also collect metrics via the following service plugins:

* statsd
* kafka_consumer
* nats_consumer
* github_webhooks

We'll be adding support for many more over the coming months. Read on if you
Expand Down
4 changes: 2 additions & 2 deletions plugins/inputs/nats_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ from a NATS cluster in parallel.
subjects = ["telegraf"]
### name a queue group
queue_group = "telegraf_consumers"
### Maximum number of points to buffer between collection intervals
point_buffer = 100000
### Maximum number of metrics to buffer between collection intervals
metric_buffer = 100000
### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
Expand Down
28 changes: 14 additions & 14 deletions plugins/inputs/nats_consumer/nats_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ type natsConsumer struct {
Servers []string
Secure bool

PointBuffer int
parser parsers.Parser
MetricBuffer int
parser parsers.Parser

sync.Mutex
Conn *nats.Conn
Expand All @@ -39,7 +39,7 @@ type natsConsumer struct {
in chan *nats.Msg
// channel for all NATS read errors
errs chan error
// channel for all incoming parsed points
// channel for all incoming parsed metrics
metricC chan telegraf.Metric
done chan struct{}
}
Expand All @@ -53,8 +53,8 @@ var sampleConfig = `
subjects = ["telegraf"]
### name a queue group
queue_group = "telegraf_consumers"
### Maximum number of points to buffer between collection intervals
point_buffer = 100000
### Maximum number of metrics to buffer between collection intervals
metric_buffer = 100000
### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
Expand Down Expand Up @@ -115,11 +115,11 @@ func (n *natsConsumer) Start() error {
}

n.done = make(chan struct{})
if n.PointBuffer == 0 {
n.PointBuffer = 100000
if n.MetricBuffer == 0 {
n.MetricBuffer = 100000
}

n.metricC = make(chan telegraf.Metric, n.PointBuffer)
n.metricC = make(chan telegraf.Metric, n.MetricBuffer)

// Start the message reader
go n.receiver()
Expand All @@ -130,7 +130,7 @@ func (n *natsConsumer) Start() error {
}

// receiver() reads all incoming messages from NATS, and parses them into
// influxdb metric points.
// telegraf metrics.
func (n *natsConsumer) receiver() {
defer n.clean()
for {
Expand All @@ -151,7 +151,7 @@ func (n *natsConsumer) receiver() {
continue
default:
log.Printf("NATS Consumer buffer is full, dropping a metric." +
" You may want to increase the point_buffer setting")
" You may want to increase the metric_buffer setting")
}
}

Expand Down Expand Up @@ -187,10 +187,10 @@ func (n *natsConsumer) Stop() {
func (n *natsConsumer) Gather(acc telegraf.Accumulator) error {
n.Lock()
defer n.Unlock()
npoints := len(n.metricC)
for i := 0; i < npoints; i++ {
point := <-n.metricC
acc.AddFields(point.Name(), point.Fields(), point.Tags(), point.Time())
nmetrics := len(n.metricC)
for i := 0; i < nmetrics; i++ {
metric := <-n.metricC
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
return nil
}
Expand Down
38 changes: 19 additions & 19 deletions plugins/inputs/nats_consumer/nats_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,26 @@ const (
testMsgGraphite = "cpu.load.short.graphite 23422 1454780029"
testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n"
invalidMsg = "cpu_load_short,host=server01 1422568543702900257"
pointBuffer = 5
metricBuffer = 5
)

func newTestNatsConsumer() (*natsConsumer, chan *nats.Msg) {
in := make(chan *nats.Msg, pointBuffer)
in := make(chan *nats.Msg, metricBuffer)
n := &natsConsumer{
QueueGroup: "test",
Subjects: []string{"telegraf"},
Servers: []string{"nats://localhost:4222"},
Secure: false,
PointBuffer: pointBuffer,
in: in,
errs: make(chan error, pointBuffer),
done: make(chan struct{}),
metricC: make(chan telegraf.Metric, pointBuffer),
QueueGroup: "test",
Subjects: []string{"telegraf"},
Servers: []string{"nats://localhost:4222"},
Secure: false,
MetricBuffer: metricBuffer,
in: in,
errs: make(chan error, metricBuffer),
done: make(chan struct{}),
metricC: make(chan telegraf.Metric, metricBuffer),
}
return n, in
}

// Test that the parser parses NATS messages into points
// Test that the parser parses NATS messages into metrics
func TestRunParser(t *testing.T) {
n, in := newTestNatsConsumer()
defer close(n.done)
Expand Down Expand Up @@ -64,24 +64,24 @@ func TestRunParserInvalidMsg(t *testing.T) {
}
}

// Test that points are dropped when we hit the buffer limit
// Test that metrics are dropped when we hit the buffer limit
func TestRunParserRespectsBuffer(t *testing.T) {
n, in := newTestNatsConsumer()
defer close(n.done)

n.parser, _ = parsers.NewInfluxParser()
go n.receiver()
for i := 0; i < pointBuffer+1; i++ {
for i := 0; i < metricBuffer+1; i++ {
in <- natsMsg(testMsg)
}
time.Sleep(time.Millisecond)

if a := len(n.metricC); a != pointBuffer {
t.Errorf("got %v, expected %v", a, pointBuffer)
if a := len(n.metricC); a != metricBuffer {
t.Errorf("got %v, expected %v", a, metricBuffer)
}
}

// Test that the parser parses nats messages into points
// Test that the parser parses line format messages into metrics
func TestRunParserAndGather(t *testing.T) {
n, in := newTestNatsConsumer()
defer close(n.done)
Expand All @@ -101,7 +101,7 @@ func TestRunParserAndGather(t *testing.T) {
map[string]interface{}{"value": float64(23422)})
}

// Test that the parser parses nats messages into points
// Test that the parser parses graphite format messages into metrics
func TestRunParserAndGatherGraphite(t *testing.T) {
n, in := newTestNatsConsumer()
defer close(n.done)
Expand All @@ -121,7 +121,7 @@ func TestRunParserAndGatherGraphite(t *testing.T) {
map[string]interface{}{"value": float64(23422)})
}

// Test that the parser parses nats messages into points
// Test that the parser parses json format messages into metrics
func TestRunParserAndGatherJSON(t *testing.T) {
n, in := newTestNatsConsumer()
defer close(n.done)
Expand Down

0 comments on commit d3884d4

Please sign in to comment.