Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http listener refactor #1915

Merged
merged 2 commits into from
Oct 24, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ continue sending logs to /var/log/telegraf/telegraf.log.
- [#1542](https://github.com/influxdata/telegraf/pull/1542): Add filestack webhook plugin.
- [#1599](https://github.com/influxdata/telegraf/pull/1599): Add server hostname for each docker measurements.
- [#1697](https://github.com/influxdata/telegraf/pull/1697): Add NATS output plugin.
- [#1407](https://github.com/influxdata/telegraf/pull/1407): HTTP service listener input plugin.
- [#1407](https://github.com/influxdata/telegraf/pull/1407) & [#1915](https://github.com/influxdata/telegraf/pull/1915): HTTP service listener input plugin.
- [#1699](https://github.com/influxdata/telegraf/pull/1699): Add database blacklist option for Postgresql
- [#1791](https://github.com/influxdata/telegraf/pull/1791): Add Docker container state metrics to Docker input plugin output
- [#1755](https://github.com/influxdata/telegraf/issues/1755): Add support to SNMP for IP & MAC address conversion.
Expand Down
6 changes: 6 additions & 0 deletions internal/buffer/buffer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package buffer

import (
"sync"

"github.com/influxdata/telegraf"
)

Expand All @@ -11,6 +13,8 @@ type Buffer struct {
drops int
// total metrics added
total int

mu sync.Mutex
}

// NewBuffer returns a Buffer
Expand Down Expand Up @@ -61,11 +65,13 @@ func (b *Buffer) Add(metrics ...telegraf.Metric) {
// the batch will be of maximum length batchSize. It can be less than batchSize,
// if the length of Buffer is less than batchSize.
func (b *Buffer) Batch(batchSize int) []telegraf.Metric {
b.mu.Lock()
n := min(len(b.buf), batchSize)
out := make([]telegraf.Metric, n)
for i := 0; i < n; i++ {
out[i] = <-b.buf
}
b.mu.Unlock()
return out
}

Expand Down
43 changes: 43 additions & 0 deletions plugins/inputs/http_listener/bufferpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package http_listener

import (
"sync/atomic"
)

type pool struct {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can just be type pool chan []byte, otherwise you're allocating a struct whose only field is a reference type.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either that, or also include the buffer size as a field in the struct. That would make it configurable at the type level, even if that doesn't get exposed as a user-facing config option.

buffers chan []byte
size int

created int64
}

// NewPool returns a new pool object.
// n is the number of buffers
// bufSize is the size (in bytes) of each buffer
func NewPool(n, bufSize int) *pool {
return &pool{
buffers: make(chan []byte, n),
size: bufSize,
}
}

func (p *pool) get() []byte {
select {
case b := <-p.buffers:
return b
default:
atomic.AddInt64(&p.created, 1)
return make([]byte, p.size)
}
}

func (p *pool) put(b []byte) {
select {
case p.buffers <- b:
default:
}
}

func (p *pool) ncreated() int64 {
return atomic.LoadInt64(&p.created)
}
Loading