Skip to content

Commit

Permalink
Change default statsd packet size to 1500, make configurable
Browse files Browse the repository at this point in the history
Also modifying the internal UDP listener/parser code to make it able to
handle higher load. The udp listener will no longer do any parsing or
string conversion. It will simply read UDP packets as bytes and put them
into a channel. The parser thread will now deal with splitting the UDP
metrics into separated strings.

This could probably be made even better by leaving everything as byte
arrays.

fixes #543
  • Loading branch information
sparrc committed Jan 19, 2016
1 parent 6a50fce commit c55c06e
Showing 1 changed file with 35 additions and 20 deletions.
55 changes: 35 additions & 20 deletions plugins/inputs/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/influxdb/telegraf/plugins/inputs"
)

const UDP_PACKET_SIZE int = 1500

var dropwarn = "ERROR: Message queue full. Discarding line [%s] " +
"You may want to increase allowed_pending_messages in the config\n"

Expand All @@ -37,10 +39,14 @@ type Statsd struct {
DeleteTimings bool
ConvertNames bool

// UDPPacketSize is the size of the read packets for the server listening
// for statsd UDP packets. This will default to 1500 bytes.
UDPPacketSize int `toml:udp_packet_size`

sync.Mutex

// Channel for all incoming statsd messages
in chan string
// Channel for all incoming statsd packets
in chan []byte
done chan struct{}

// Cache gauges, counters & sets so they can be aggregated as they arrive
Expand All @@ -58,13 +64,14 @@ func NewStatsd() *Statsd {

// Make data structures
s.done = make(chan struct{})
s.in = make(chan string, s.AllowedPendingMessages)
s.in = make(chan []byte, s.AllowedPendingMessages)
s.gauges = make(map[string]cachedgauge)
s.counters = make(map[string]cachedcounter)
s.sets = make(map[string]cachedset)
s.timings = make(map[string]cachedtimings)

s.ConvertNames = true
s.UDPPacketSize = UDP_PACKET_SIZE

return &s
}
Expand Down Expand Up @@ -139,6 +146,10 @@ const sampleConfig = `
# calculation of percentiles. Raising this limit increases the accuracy
# of percentiles but also increases the memory usage and cpu time.
percentile_limit = 1000
# UDP packet size for the server to listen for. This will depend on the size
# of the packets that the client is sending, which is usually 1500 bytes.
udp_packet_size = 1500
`

func (_ *Statsd) SampleConfig() string {
Expand Down Expand Up @@ -191,7 +202,7 @@ func (s *Statsd) Gather(acc inputs.Accumulator) error {
func (s *Statsd) Start() error {
// Make data structures
s.done = make(chan struct{})
s.in = make(chan string, s.AllowedPendingMessages)
s.in = make(chan []byte, s.AllowedPendingMessages)
s.gauges = make(map[string]cachedgauge)
s.counters = make(map[string]cachedcounter)
s.sets = make(map[string]cachedset)
Expand Down Expand Up @@ -220,36 +231,37 @@ func (s *Statsd) udpListen() error {
case <-s.done:
return nil
default:
buf := make([]byte, 1024)
buf := make([]byte, s.UDPPacketSize)
n, _, err := listener.ReadFromUDP(buf)
if err != nil {
log.Printf("ERROR: %s\n", err.Error())
}

lines := strings.Split(string(buf[:n]), "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
if line != "" {
select {
case s.in <- line:
default:
log.Printf(dropwarn, line)
}
}
select {
case s.in <- buf[:n]:
default:
log.Printf(dropwarn, string(buf[:n]))
}
}
}
}

// parser monitors the s.in channel, if there is a line ready, it parses the
// statsd string into a usable metric struct and aggregates the value
// parser monitors the s.in channel, if there is a packet ready, it parses the
// packet into statsd strings and then calls parseStatsdLine, which parses a
// single statsd metric into a struct.
func (s *Statsd) parser() error {
for {
select {
case <-s.done:
return nil
case line := <-s.in:
s.parseStatsdLine(line)
case packet := <-s.in:
lines := strings.Split(string(packet), "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
if line != "" {
s.parseStatsdLine(line)
}
}
}
}
}
Expand Down Expand Up @@ -499,6 +511,9 @@ func (s *Statsd) Stop() {

func init() {
inputs.Add("statsd", func() inputs.Input {
return &Statsd{ConvertNames: true}
return &Statsd{
ConvertNames: true,
UDPPacketSize: UDP_PACKET_SIZE,
}
})
}

0 comments on commit c55c06e

Please sign in to comment.