Skip to content

Commit

Permalink
Add support for precision in http_listener (influxdata#2644)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored Apr 10, 2017
1 parent 07c428e commit 62b5c1f
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ be deprecated eventually.
- [#2597](https://github.com/influxdata/telegraf/issues/2597): Add support for Linux sysctl-fs metrics.
- [#2425](https://github.com/influxdata/telegraf/pull/2425): Support to include/exclude docker container labels as tags
- [#1667](https://github.com/influxdata/telegraf/pull/1667): dmcache input plugin
- [#2637](https://github.com/influxdata/telegraf/issues/2637): Add support for precision in http_listener

### Bugfixes

Expand Down
50 changes: 47 additions & 3 deletions metric/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"errors"
"fmt"
"strconv"
"time"

"github.com/influxdata/telegraf"
Expand Down Expand Up @@ -40,10 +41,18 @@ const (
)

func Parse(buf []byte) ([]telegraf.Metric, error) {
return ParseWithDefaultTime(buf, time.Now())
return ParseWithDefaultTimePrecision(buf, time.Now(), "")
}

func ParseWithDefaultTime(buf []byte, t time.Time) ([]telegraf.Metric, error) {
return ParseWithDefaultTimePrecision(buf, t, "")
}

func ParseWithDefaultTimePrecision(
buf []byte,
t time.Time,
precision string,
) ([]telegraf.Metric, error) {
if len(buf) == 0 {
return []telegraf.Metric{}, nil
}
Expand All @@ -63,7 +72,7 @@ func ParseWithDefaultTime(buf []byte, t time.Time) ([]telegraf.Metric, error) {
continue
}

m, err := parseMetric(buf[i:i+j], t)
m, err := parseMetric(buf[i:i+j], t, precision)
if err != nil {
i += j + 1 // increment i past the previous newline
errStr += " " + err.Error()
Expand All @@ -80,7 +89,10 @@ func ParseWithDefaultTime(buf []byte, t time.Time) ([]telegraf.Metric, error) {
return metrics, nil
}

func parseMetric(buf []byte, defaultTime time.Time) (telegraf.Metric, error) {
func parseMetric(buf []byte,
defaultTime time.Time,
precision string,
) (telegraf.Metric, error) {
var dTime string
// scan the first block which is measurement[,tag1=value1,tag2=value=2...]
pos, key, err := scanKey(buf, 0)
Expand Down Expand Up @@ -114,9 +126,23 @@ func parseMetric(buf []byte, defaultTime time.Time) (telegraf.Metric, error) {
return nil, err
}

// apply precision multiplier
var nsec int64
multiplier := getPrecisionMultiplier(precision)
if multiplier > 1 {
tsint, err := parseIntBytes(ts, 10, 64)
if err != nil {
return nil, err
}

nsec := multiplier * tsint
ts = []byte(strconv.FormatInt(nsec, 10))
}

m := &metric{
fields: fields,
t: ts,
nsec: nsec,
}

// parse out the measurement name
Expand Down Expand Up @@ -628,3 +654,21 @@ func makeError(reason string, buf []byte, i int) error {
return fmt.Errorf("metric parsing error, reason: [%s], buffer: [%s], index: [%d]",
reason, buf, i)
}

// getPrecisionMultiplier will return a multiplier for the precision specified.
func getPrecisionMultiplier(precision string) int64 {
d := time.Nanosecond
switch precision {
case "u":
d = time.Microsecond
case "ms":
d = time.Millisecond
case "s":
d = time.Second
case "m":
d = time.Minute
case "h":
d = time.Hour
}
return int64(d)
}
21 changes: 21 additions & 0 deletions metric/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,27 @@ func TestParseNegativeTimestamps(t *testing.T) {
}
}

func TestParsePrecision(t *testing.T) {
for _, tt := range []struct {
line string
precision string
expected int64
}{
{"test v=42 1491847420", "s", 1491847420000000000},
{"test v=42 1491847420123", "ms", 1491847420123000000},
{"test v=42 1491847420123456", "u", 1491847420123456000},
{"test v=42 1491847420123456789", "ns", 1491847420123456789},

{"test v=42 1491847420123456789", "1s", 1491847420123456789},
{"test v=42 1491847420123456789", "asdf", 1491847420123456789},
} {
metrics, err := ParseWithDefaultTimePrecision(
[]byte(tt.line+"\n"), time.Now(), tt.precision)
assert.NoError(t, err, tt)
assert.Equal(t, tt.expected, metrics[0].UnixNano())
}
}

func TestParseMaxKeyLength(t *testing.T) {
key := ""
for {
Expand Down
11 changes: 9 additions & 2 deletions plugins/inputs/http_listener/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,18 @@

The HTTP listener is a service input plugin that listens for messages sent via HTTP POST.
The plugin expects messages in the InfluxDB line-protocol ONLY, other Telegraf input data formats are not supported.
The intent of the plugin is to allow Telegraf to serve as a proxy/router for the /write endpoint of the InfluxDB HTTP API.
The intent of the plugin is to allow Telegraf to serve as a proxy/router for the `/write` endpoint of the InfluxDB HTTP API.

The `/write` endpoint supports the `precision` query parameter and can be set to one of `ns`, `u`, `ms`, `s`, `m`, `h`. All other parameters are ignored and defer to the output plugins configuration.

When chaining Telegraf instances using this plugin, CREATE DATABASE requests receive a 200 OK response with message body `{"results":[]}` but they are not relayed. The output configuration of the Telegraf instance which ultimately submits data to InfluxDB determines the destination database.

See: [Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#influx).
Example: curl -i -XPOST 'http://localhost:8186/write' --data-binary 'cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000'

**Example:**
```
curl -i -XPOST 'http://localhost:8186/write' --data-binary 'cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000'
```

### Configuration:

Expand Down
12 changes: 7 additions & 5 deletions plugins/inputs/http_listener/http_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,12 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
}
now := time.Now()

precision := req.URL.Query().Get("precision")

// Handle gzip request bodies
body := req.Body
var err error
if req.Header.Get("Content-Encoding") == "gzip" {
var err error
body, err = gzip.NewReader(req.Body)
defer body.Close()
if err != nil {
Expand Down Expand Up @@ -263,7 +265,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {

if err == io.ErrUnexpectedEOF {
// finished reading the request body
if err := h.parse(buf[:n+bufStart], now); err != nil {
if err := h.parse(buf[:n+bufStart], now, precision); err != nil {
log.Println("E! " + err.Error())
return400 = true
}
Expand All @@ -288,7 +290,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
bufStart = 0
continue
}
if err := h.parse(buf[:i+1], now); err != nil {
if err := h.parse(buf[:i+1], now, precision); err != nil {
log.Println("E! " + err.Error())
return400 = true
}
Expand All @@ -301,8 +303,8 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
}
}

func (h *HTTPListener) parse(b []byte, t time.Time) error {
metrics, err := h.parser.ParseWithDefaultTime(b, t)
func (h *HTTPListener) parse(b []byte, t time.Time, precision string) error {
metrics, err := h.parser.ParseWithDefaultTimePrecision(b, t, precision)

for _, m := range metrics {
h.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
Expand Down
19 changes: 19 additions & 0 deletions plugins/inputs/http_listener/http_listener_test.go

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions plugins/parsers/influx/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ type InfluxParser struct {
DefaultTags map[string]string
}

func (p *InfluxParser) ParseWithDefaultTime(buf []byte, t time.Time) ([]telegraf.Metric, error) {
func (p *InfluxParser) ParseWithDefaultTimePrecision(buf []byte, t time.Time, precision string) ([]telegraf.Metric, error) {
if !bytes.HasSuffix(buf, []byte("\n")) {
buf = append(buf, '\n')
}
// parse even if the buffer begins with a newline
buf = bytes.TrimPrefix(buf, []byte("\n"))
metrics, err := metric.ParseWithDefaultTime(buf, t)
metrics, err := metric.ParseWithDefaultTimePrecision(buf, t, precision)
if len(p.DefaultTags) > 0 {
for _, m := range metrics {
for k, v := range p.DefaultTags {
Expand All @@ -41,7 +41,7 @@ func (p *InfluxParser) ParseWithDefaultTime(buf []byte, t time.Time) ([]telegraf
// a non-nil error will be returned in addition to the metrics that parsed
// successfully.
func (p *InfluxParser) Parse(buf []byte) ([]telegraf.Metric, error) {
return p.ParseWithDefaultTime(buf, time.Now())
return p.ParseWithDefaultTimePrecision(buf, time.Now(), "")
}

func (p *InfluxParser) ParseLine(line string) (telegraf.Metric, error) {
Expand Down

0 comments on commit 62b5c1f

Please sign in to comment.