Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the UDP input buffer only once #976

Merged
merged 1 commit into from
Apr 5, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## v0.12.1 [unreleased]

### Features
- [#976](https://github.com/influxdata/telegraf/pull/976): Reduce allocations in the UDP and statsd inputs.

### Bugfixes
- [#968](https://github.com/influxdata/telegraf/issues/968): Processes plugin gets unknown state when spaces are in (command name)
Expand Down
17 changes: 8 additions & 9 deletions plugins/inputs/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
)

const (
UDP_PACKET_SIZE int = 1500
// UDP packet limit, see
// https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure
UDP_PACKET_SIZE int = 65507

defaultFieldName = "value"

Expand Down Expand Up @@ -157,10 +159,6 @@ const sampleConfig = `
## calculation of percentiles. Raising this limit increases the accuracy
## of percentiles but also increases the memory usage and cpu time.
percentile_limit = 1000

## UDP packet size for the server to listen for. This will depend on the size
## of the packets that the client is sending, which is usually 1500 bytes.
udp_packet_size = 1500
`

func (_ *Statsd) SampleConfig() string {
Expand Down Expand Up @@ -274,12 +272,12 @@ func (s *Statsd) udpListen() error {
}
log.Println("Statsd listener listening on: ", s.listener.LocalAddr().String())

buf := make([]byte, s.UDPPacketSize)
for {
select {
case <-s.done:
return nil
default:
buf := make([]byte, s.UDPPacketSize)
n, _, err := s.listener.ReadFromUDP(buf)
if err != nil && !strings.Contains(err.Error(), "closed network") {
log.Printf("ERROR READ: %s\n", err.Error())
Expand All @@ -300,11 +298,12 @@ func (s *Statsd) udpListen() error {
// single statsd metric into a struct.
func (s *Statsd) parser() error {
defer s.wg.Done()
var packet []byte
for {
select {
case <-s.done:
return nil
case packet := <-s.in:
case packet = <-s.in:
lines := strings.Split(string(packet), "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
Expand Down Expand Up @@ -631,8 +630,8 @@ func (s *Statsd) Stop() {
func init() {
inputs.Add("statsd", func() telegraf.Input {
return &Statsd{
ConvertNames: true,
UDPPacketSize: UDP_PACKET_SIZE,
MetricSeparator: "_",
UDPPacketSize: UDP_PACKET_SIZE,
}
})
}
15 changes: 7 additions & 8 deletions plugins/inputs/udp_listener/udp_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ type UdpListener struct {
listener *net.UDPConn
}

const UDP_PACKET_SIZE int = 1500
// UDP packet limit, see
// https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure
const UDP_PACKET_SIZE int = 65507

var dropwarn = "ERROR: Message queue full. Discarding line [%s] " +
"You may want to increase allowed_pending_messages in the config\n"
Expand All @@ -43,11 +45,6 @@ const sampleConfig = `
## UDP listener will start dropping packets.
allowed_pending_messages = 10000

## UDP packet size for the server to listen for. This will depend
## on the size of the packets that the client is sending, which is
## usually 1500 bytes, but can be as large as 65,535 bytes.
udp_packet_size = 1500

## Data format to consume.
## Each data format has it's own unique set of configuration options, read
## more about them here:
Expand Down Expand Up @@ -107,12 +104,12 @@ func (u *UdpListener) udpListen() error {
}
log.Println("UDP server listening on: ", u.listener.LocalAddr().String())

buf := make([]byte, u.UDPPacketSize)
for {
select {
case <-u.done:
return nil
default:
buf := make([]byte, u.UDPPacketSize)
n, _, err := u.listener.ReadFromUDP(buf)
if err != nil && !strings.Contains(err.Error(), "closed network") {
log.Printf("ERROR: %s\n", err.Error())
Expand All @@ -130,11 +127,13 @@ func (u *UdpListener) udpListen() error {

func (u *UdpListener) udpParser() error {
defer u.wg.Done()

var packet []byte
for {
select {
case <-u.done:
return nil
case packet := <-u.in:
case packet = <-u.in:
metrics, err := u.parser.Parse(packet)
if err == nil {
u.storeMetrics(metrics)
Expand Down