Skip to content

Commit

Permalink
Make the UDP input buffer only once
Browse files Browse the repository at this point in the history
  • Loading branch information
sparrc committed Apr 5, 2016
1 parent 4dd364e commit 2433d1c
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 17 deletions.
17 changes: 8 additions & 9 deletions plugins/inputs/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
)

const (
UDP_PACKET_SIZE int = 1500
// UDP packet limit, see
// https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure
UDP_PACKET_SIZE int = 65507

defaultFieldName = "value"

Expand Down Expand Up @@ -157,10 +159,6 @@ const sampleConfig = `
## calculation of percentiles. Raising this limit increases the accuracy
## of percentiles but also increases the memory usage and cpu time.
percentile_limit = 1000
## UDP packet size for the server to listen for. This will depend on the size
## of the packets that the client is sending, which is usually 1500 bytes.
udp_packet_size = 1500
`

func (_ *Statsd) SampleConfig() string {
Expand Down Expand Up @@ -274,12 +272,12 @@ func (s *Statsd) udpListen() error {
}
log.Println("Statsd listener listening on: ", s.listener.LocalAddr().String())

buf := make([]byte, s.UDPPacketSize)
for {
select {
case <-s.done:
return nil
default:
buf := make([]byte, s.UDPPacketSize)
n, _, err := s.listener.ReadFromUDP(buf)
if err != nil && !strings.Contains(err.Error(), "closed network") {
log.Printf("ERROR READ: %s\n", err.Error())
Expand All @@ -300,11 +298,12 @@ func (s *Statsd) udpListen() error {
// single statsd metric into a struct.
func (s *Statsd) parser() error {
defer s.wg.Done()
var packet []byte
for {
select {
case <-s.done:
return nil
case packet := <-s.in:
case packet = <-s.in:
lines := strings.Split(string(packet), "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
Expand Down Expand Up @@ -631,8 +630,8 @@ func (s *Statsd) Stop() {
func init() {
inputs.Add("statsd", func() telegraf.Input {
return &Statsd{
ConvertNames: true,
UDPPacketSize: UDP_PACKET_SIZE,
MetricSeparator: "_",
UDPPacketSize: UDP_PACKET_SIZE,
}
})
}
15 changes: 7 additions & 8 deletions plugins/inputs/udp_listener/udp_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ type UdpListener struct {
listener *net.UDPConn
}

const UDP_PACKET_SIZE int = 1500
// UDP packet limit, see
// https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure
const UDP_PACKET_SIZE int = 65507

var dropwarn = "ERROR: Message queue full. Discarding line [%s] " +
"You may want to increase allowed_pending_messages in the config\n"
Expand All @@ -43,11 +45,6 @@ const sampleConfig = `
## UDP listener will start dropping packets.
allowed_pending_messages = 10000
## UDP packet size for the server to listen for. This will depend
## on the size of the packets that the client is sending, which is
## usually 1500 bytes, but can be as large as 65,535 bytes.
udp_packet_size = 1500
## Data format to consume.
## Each data format has it's own unique set of configuration options, read
## more about them here:
Expand Down Expand Up @@ -107,12 +104,12 @@ func (u *UdpListener) udpListen() error {
}
log.Println("UDP server listening on: ", u.listener.LocalAddr().String())

buf := make([]byte, u.UDPPacketSize)
for {
select {
case <-u.done:
return nil
default:
buf := make([]byte, u.UDPPacketSize)
n, _, err := u.listener.ReadFromUDP(buf)
if err != nil && !strings.Contains(err.Error(), "closed network") {
log.Printf("ERROR: %s\n", err.Error())
Expand All @@ -130,11 +127,13 @@ func (u *UdpListener) udpListen() error {

func (u *UdpListener) udpParser() error {
defer u.wg.Done()

var packet []byte
for {
select {
case <-u.done:
return nil
case packet := <-u.in:
case packet = <-u.in:
metrics, err := u.parser.Parse(packet)
if err == nil {
u.storeMetrics(metrics)
Expand Down

0 comments on commit 2433d1c

Please sign in to comment.