Skip to content

Commit

Permalink
http listener refactor
Browse files Browse the repository at this point in the history
in this commit:

- chunks out the http request body to avoid making very large
  allocations.
- establishes a limit for the maximum http request body size that the
  listener will accept.
- utilizes a pool of byte buffers to reduce GC pressure.
  • Loading branch information
sparrc committed Oct 21, 2016
1 parent 91f48e7 commit e666485
Show file tree
Hide file tree
Showing 7 changed files with 287 additions and 167 deletions.
6 changes: 6 additions & 0 deletions internal/buffer/buffer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package buffer

import (
"sync"

"github.com/influxdata/telegraf"
)

Expand All @@ -11,6 +13,8 @@ type Buffer struct {
drops int
// total metrics added
total int

sync.Mutex
}

// NewBuffer returns a Buffer
Expand Down Expand Up @@ -61,11 +65,13 @@ func (b *Buffer) Add(metrics ...telegraf.Metric) {
// the batch will be of maximum length batchSize. It can be less than batchSize,
// if the length of Buffer is less than batchSize.
func (b *Buffer) Batch(batchSize int) []telegraf.Metric {
b.Lock()
n := min(len(b.buf), batchSize)
out := make([]telegraf.Metric, n)
for i := 0; i < n; i++ {
out[i] = <-b.buf
}
b.Unlock()
return out
}

Expand Down
43 changes: 43 additions & 0 deletions plugins/inputs/http_listener/bufferpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package http_listener

import (
"sync/atomic"
)

type pool struct {
buffers chan []byte
size int

created int64
}

// NewPool returns a new pool object.
// n is the number of buffers
// bufSize is the size (in bytes) of each buffer
func NewPool(n, bufSize int) *pool {
return &pool{
buffers: make(chan []byte, n),
size: bufSize,
}
}

func (p *pool) get() []byte {
select {
case b := <-p.buffers:
return b
default:
atomic.AddInt64(&p.created, 1)
return make([]byte, p.size)
}
}

func (p *pool) put(b []byte) {
select {
case p.buffers <- b:
default:
}
}

func (p *pool) ncreated() int64 {
return atomic.LoadInt64(&p.created)
}
Loading

0 comments on commit e666485

Please sign in to comment.