Skip to content

Commit

Permalink
http listener refactor
Browse files Browse the repository at this point in the history
in this commit:

- chunks out the http request body to avoid making very large
  allocations.
- establishes a limit for the maximum http request body size that the
  listener will accept.
- utilizes a pool of byte buffers to reduce GC pressure.
  • Loading branch information
sparrc committed Oct 18, 2016
1 parent 91f48e7 commit db2579f
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 35 deletions.
34 changes: 34 additions & 0 deletions plugins/inputs/http_listener/bufferpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package http_listener

type pool struct {
buffers chan []byte
}

func NewPool(n int) *pool {
p := &pool{
buffers: make(chan []byte, n),
}
for i := 0; i < n; i++ {
p.buffers <- make([]byte, MAX_LINE_SIZE)
}
return p
}

func (p *pool) get() []byte {
select {
case b := <-p.buffers:
return b
default:
// pool is empty, so make a new buffer
return make([]byte, MAX_LINE_SIZE)
}
}

func (p *pool) put(b []byte) {
select {
case p.buffers <- b:
default:
// the pool is full, so drop this buffer
b = nil
}
}
183 changes: 148 additions & 35 deletions plugins/inputs/http_listener/http_listener.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package http_listener

import (
"bufio"
"bytes"
"fmt"
"compress/gzip"
"io"
"log"
"net"
"net/http"
Expand All @@ -17,27 +17,45 @@ import (
"github.com/influxdata/telegraf/plugins/parsers"
)

const (
// DEFAULT_REQUEST_BODY_MAX is the default maximum request body size, in bytes.
// if the request body is over this size, we will return an HTTP 413 error.
// 512 MB
DEFAULT_REQUEST_BODY_MAX = 512 * 1000 * 1000

// MAX_LINE_SIZE is the maximum size, in bytes, that can be allocated for
// a single InfluxDB point.
// 1 MB
MAX_LINE_SIZE = 1 * 1000 * 1000
)

type HttpListener struct {
ServiceAddress string
ReadTimeout internal.Duration
WriteTimeout internal.Duration
MaxBodySize int64

sync.Mutex
wg sync.WaitGroup

listener *stoppableListener.StoppableListener

parser parsers.Parser
acc telegraf.Accumulator
pool *pool
}

const sampleConfig = `
## Address and port to host HTTP listener on
service_address = ":8186"
## timeouts
## maximum duration before timing out read of the request
read_timeout = "10s"
## maximum duration before timing out write of the response
write_timeout = "10s"
## Maximum allowed http request body size in bytes.
## 0 means to use the default of 1,000,000,000 bytes (1 gigabyte)
max_body_size = 0
`

func (t *HttpListener) SampleConfig() string {
Expand All @@ -61,7 +79,12 @@ func (t *HttpListener) Start(acc telegraf.Accumulator) error {
t.Lock()
defer t.Unlock()

if t.MaxBodySize == 0 {
t.MaxBodySize = DEFAULT_REQUEST_BODY_MAX
}

t.acc = acc
t.pool = NewPool(100)

var rawListener, err = net.Listen("tcp", t.ServiceAddress)
if err != nil {
Expand All @@ -87,8 +110,6 @@ func (t *HttpListener) Stop() {
t.listener.Stop()
t.listener.Close()

t.wg.Wait()

log.Println("I! Stopped HTTP listener service on ", t.ServiceAddress)
}

Expand All @@ -111,37 +132,9 @@ func (t *HttpListener) httpListen() error {
}

func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
t.wg.Add(1)
defer t.wg.Done()

switch req.URL.Path {
case "/write":
var http400msg bytes.Buffer
var partial string
scanner := bufio.NewScanner(req.Body)
scanner.Buffer([]byte(""), 128*1024)
for scanner.Scan() {
metrics, err := t.parser.Parse(scanner.Bytes())
if err == nil {
for _, m := range metrics {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
partial = "partial write: "
} else {
http400msg.WriteString(err.Error() + " ")
}
}

if err := scanner.Err(); err != nil {
http.Error(res, "Internal server error: "+err.Error(), http.StatusInternalServerError)
} else if http400msg.Len() > 0 {
res.Header().Set("Content-Type", "application/json")
res.Header().Set("X-Influxdb-Version", "1.0")
res.WriteHeader(http.StatusBadRequest)
res.Write([]byte(fmt.Sprintf(`{"error":"%s%s"}`, partial, http400msg.String())))
} else {
res.WriteHeader(http.StatusNoContent)
}
t.serveWrite(res, req)
case "/query":
// Deliver a dummy response to the query endpoint, as some InfluxDB
// clients test endpoint availability with a query
Expand All @@ -158,6 +151,126 @@ func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
}
}

func (t *HttpListener) serveWrite(res http.ResponseWriter, req *http.Request) {
// Check that the content length is not too large for us to handle.
if req.ContentLength > t.MaxBodySize {
toolarge(res)
return
}

// Handle gzip request bodies
var body io.ReadCloser
if req.Header.Get("Content-Encoding") == "gzip" {
r, err := gzip.NewReader(req.Body)
defer r.Close()
if err != nil {
log.Println("E! " + err.Error())
badrequest(res)
return
}
body = http.MaxBytesReader(res, r, t.MaxBodySize)
} else {
body = http.MaxBytesReader(res, req.Body, t.MaxBodySize)
}

var return400 bool
var buf []byte
var nextbuf []byte
bufi := 0
for {
if len(nextbuf) == 0 {
buf = t.pool.get()
} else {
buf = nextbuf
}
n, err := io.ReadFull(body, buf[bufi:])

if err != nil && err != io.ErrUnexpectedEOF {
log.Println("E! " + err.Error())
// problem reading the request body
badrequest(res)
t.pool.put(buf)
return
}

if err == io.ErrUnexpectedEOF || n == 0 {
// finished reading the request body
if err := t.parse(buf[:n+bufi]); err != nil {
log.Println("E! " + err.Error())
return400 = true
}
t.pool.put(buf)
if return400 {
badrequest(res)
} else {
res.WriteHeader(http.StatusNoContent)
}
return
}

i := bytes.LastIndexByte(buf, '\n')
if i == -1 {
newlinei := findnewline(body)
log.Printf("E! http_listener received a single line of %d bytes, maximum is %d bytes",
MAX_LINE_SIZE+newlinei, MAX_LINE_SIZE)
return400 = true
continue
}
if err := t.parse(buf[:i]); err != nil {
log.Println("E! " + err.Error())
return400 = true
}
nextbuf = t.pool.get()
copy(nextbuf[:len(buf)-i], buf[i:])
bufi = len(buf) - i
t.pool.put(buf)
}
}

func (t *HttpListener) parse(b []byte) error {
metrics, err := t.parser.Parse(b)

for _, m := range metrics {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}

return err
}

func toolarge(res http.ResponseWriter) {
res.Header().Set("Content-Type", "application/json")
res.Header().Set("X-Influxdb-Version", "1.0")
res.WriteHeader(http.StatusRequestEntityTooLarge)
res.Write([]byte(`{"error":"http: request body too large"}`))
}

func badrequest(res http.ResponseWriter) {
res.Header().Set("Content-Type", "application/json")
res.Header().Set("X-Influxdb-Version", "1.0")
res.WriteHeader(http.StatusBadRequest)
res.Write([]byte(`{"error":"http: bad request"}`))
}

// findnewline finds the next newline in the given reader. It returns the number
// of bytes it had to read to get there.
func findnewline(r io.Reader) int {
// drop any line longer than the max buffer size
counter := 0
// read until the next newline:
var tmp [1]byte
for {
_, err := r.Read(tmp[:])
if err != nil {
break
}
counter++
if tmp[0] == '\n' {
break
}
}
return counter
}

func init() {
inputs.Add("http_listener", func() telegraf.Input {
return &HttpListener{}
Expand Down

0 comments on commit db2579f

Please sign in to comment.