Skip to content

Commit

Permalink
Fix InfluxDB output UDP line splitting (#5439)
Browse files Browse the repository at this point in the history
(cherry picked from commit 5dfa3fa)
  • Loading branch information
oplehto authored and danielnelson committed Feb 19, 2019
1 parent 5bbb9d3 commit 302b7df
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 2 deletions.
22 changes: 20 additions & 2 deletions plugins/outputs/influxdb/udp.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package influxdb

import (
"bufio"
"bytes"
"context"
"fmt"
"log"
Expand Down Expand Up @@ -45,9 +47,9 @@ func NewUDPClient(config *UDPConfig) (*udpClient, error) {
serializer := config.Serializer
if serializer == nil {
s := influx.NewSerializer()
s.SetMaxLineBytes(config.MaxPayloadSize)
serializer = s
}
serializer.SetMaxLineBytes(size)

dialer := config.Dialer
if dialer == nil {
Expand Down Expand Up @@ -96,7 +98,11 @@ func (c *udpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
continue
}

_, err = c.conn.Write(octets)
scanner := bufio.NewScanner(bytes.NewReader(octets))
scanner.Split(scanLines)
for scanner.Scan() {
_, err = c.conn.Write(scanner.Bytes())
}
if err != nil {
c.conn.Close()
c.conn = nil
Expand All @@ -118,3 +124,15 @@ type netDialer struct {
func (d *netDialer) DialContext(ctx context.Context, network, address string) (Conn, error) {
return d.Dialer.DialContext(ctx, network, address)
}

func scanLines(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
if i := bytes.IndexByte(data, '\n'); i >= 0 {
// We have a full newline-terminated line.
return i + 1, data[0 : i+1], nil

}
return 0, nil, nil
}
1 change: 1 addition & 0 deletions plugins/serializers/influx/influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ func (s *Serializer) writeMetric(w io.Writer, m telegraf.Metric) error {
return err
}

pairsLen = 0
firstField = true
bytesNeeded = len(s.header) + len(s.pair) + len(s.footer)

Expand Down
18 changes: 18 additions & 0 deletions plugins/serializers/influx/influx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,24 @@ var tests = []struct {
),
output: []byte("cpu abc=123i 1519194109000000042\ncpu def=456i 1519194109000000042\n"),
},
{
name: "split_fields_overflow",
maxBytes: 43,
input: MustMetric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"abc": 123,
"def": 456,
"ghi": 789,
"jkl": 123,
},
time.Unix(1519194109, 42),
),
),
output: []byte("cpu abc=123i,def=456i 1519194109000000042\ncpu ghi=789i,jkl=123i 1519194109000000042\n"),
},
{
name: "name newline",
input: MustMetric(
Expand Down

0 comments on commit 302b7df

Please sign in to comment.