Skip to content

Commit

Permalink
Refactor UDP & TCP input buffers
Browse files Browse the repository at this point in the history
closes #991
  • Loading branch information
sparrc committed Apr 7, 2016
1 parent b534b58 commit feec6b4
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 9 deletions.
18 changes: 11 additions & 7 deletions plugins/inputs/tcp_listener/tcp_listener.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package tcp_listener

import (
"bufio"
"fmt"
"log"
"net"
Expand Down Expand Up @@ -39,7 +38,7 @@ type TcpListener struct {
acc telegraf.Accumulator
}

var dropwarn = "ERROR: Message queue full. Discarding metric. " +
var dropwarn = "ERROR: Message queue full. Discarding metric [%s], " +
"You may want to increase allowed_pending_messages in the config\n"

const sampleConfig = `
Expand Down Expand Up @@ -193,19 +192,24 @@ func (t *TcpListener) handler(conn *net.TCPConn, id string) {
t.forget(id)
}()

scanner := bufio.NewScanner(conn)
buf := make([]byte, 8192)
for {
select {
case <-t.done:
return
default:
if !scanner.Scan() {
return
n, err := conn.Read(buf)
if err != nil {
log.Printf("ERROR: %s\n", err.Error())
continue
}
bufCopy := make([]byte, n)
copy(bufCopy, buf[:n])

select {
case t.in <- scanner.Bytes():
case t.in <- bufCopy:
default:
log.Printf(dropwarn)
log.Printf(dropwarn, string(bufCopy))
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions plugins/inputs/udp_listener/udp_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,13 @@ func (u *UdpListener) udpListen() error {
log.Printf("ERROR: %s\n", err.Error())
continue
}
bufCopy := make([]byte, n)
copy(bufCopy, buf[:n])

select {
case u.in <- buf[:n]:
case u.in <- bufCopy:
default:
log.Printf(dropwarn, string(buf[:n]))
log.Printf(dropwarn, string(bufCopy))
}
}
}
Expand Down

0 comments on commit feec6b4

Please sign in to comment.