-
Notifications
You must be signed in to change notification settings - Fork 5.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add socket listener & writer (#2094)
- Loading branch information
Showing
9 changed files
with
675 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,240 @@ | ||
package socket_listener | ||
|
||
import ( | ||
"bufio" | ||
"fmt" | ||
"io" | ||
"log" | ||
"net" | ||
"strings" | ||
"sync" | ||
|
||
"github.com/influxdata/telegraf" | ||
"github.com/influxdata/telegraf/plugins/inputs" | ||
"github.com/influxdata/telegraf/plugins/parsers" | ||
) | ||
|
||
type setReadBufferer interface { | ||
SetReadBuffer(bytes int) error | ||
} | ||
|
||
type streamSocketListener struct { | ||
net.Listener | ||
*SocketListener | ||
|
||
connections map[string]net.Conn | ||
connectionsMtx sync.Mutex | ||
} | ||
|
||
func (ssl *streamSocketListener) listen() { | ||
ssl.connections = map[string]net.Conn{} | ||
|
||
for { | ||
c, err := ssl.Accept() | ||
if err != nil { | ||
ssl.AddError(err) | ||
break | ||
} | ||
|
||
ssl.connectionsMtx.Lock() | ||
if ssl.MaxConnections > 0 && len(ssl.connections) >= ssl.MaxConnections { | ||
ssl.connectionsMtx.Unlock() | ||
c.Close() | ||
continue | ||
} | ||
ssl.connections[c.RemoteAddr().String()] = c | ||
ssl.connectionsMtx.Unlock() | ||
go ssl.read(c) | ||
} | ||
|
||
ssl.connectionsMtx.Lock() | ||
for _, c := range ssl.connections { | ||
c.Close() | ||
} | ||
ssl.connectionsMtx.Unlock() | ||
} | ||
|
||
func (ssl *streamSocketListener) removeConnection(c net.Conn) { | ||
ssl.connectionsMtx.Lock() | ||
delete(ssl.connections, c.RemoteAddr().String()) | ||
ssl.connectionsMtx.Unlock() | ||
} | ||
|
||
func (ssl *streamSocketListener) read(c net.Conn) { | ||
defer ssl.removeConnection(c) | ||
defer c.Close() | ||
|
||
scnr := bufio.NewScanner(c) | ||
for scnr.Scan() { | ||
metrics, err := ssl.Parse(scnr.Bytes()) | ||
if err != nil { | ||
ssl.AddError(fmt.Errorf("unable to parse incoming line")) | ||
//TODO rate limit | ||
continue | ||
} | ||
for _, m := range metrics { | ||
ssl.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) | ||
} | ||
} | ||
|
||
if err := scnr.Err(); err != nil { | ||
ssl.AddError(err) | ||
} | ||
} | ||
|
||
type packetSocketListener struct { | ||
net.PacketConn | ||
*SocketListener | ||
} | ||
|
||
func (psl *packetSocketListener) listen() { | ||
buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet | ||
for { | ||
n, _, err := psl.ReadFrom(buf) | ||
if err != nil { | ||
psl.AddError(err) | ||
break | ||
} | ||
|
||
metrics, err := psl.Parse(buf[:n]) | ||
if err != nil { | ||
psl.AddError(fmt.Errorf("unable to parse incoming packet")) | ||
//TODO rate limit | ||
continue | ||
} | ||
for _, m := range metrics { | ||
psl.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) | ||
} | ||
} | ||
} | ||
|
||
type SocketListener struct { | ||
ServiceAddress string | ||
MaxConnections int | ||
ReadBufferSize int | ||
|
||
parsers.Parser | ||
telegraf.Accumulator | ||
io.Closer | ||
} | ||
|
||
func (sl *SocketListener) Description() string { | ||
return "Generic socket listener capable of handling multiple socket types." | ||
} | ||
|
||
func (sl *SocketListener) SampleConfig() string { | ||
return ` | ||
## URL to listen on | ||
# service_address = "tcp://:8094" | ||
# service_address = "tcp://127.0.0.1:http" | ||
# service_address = "tcp4://:8094" | ||
# service_address = "tcp6://:8094" | ||
# service_address = "tcp6://[2001:db8::1]:8094" | ||
# service_address = "udp://:8094" | ||
# service_address = "udp4://:8094" | ||
# service_address = "udp6://:8094" | ||
# service_address = "unix:///tmp/telegraf.sock" | ||
# service_address = "unixgram:///tmp/telegraf.sock" | ||
## Maximum number of concurrent connections. | ||
## Only applies to stream sockets (e.g. TCP). | ||
## 0 (default) is unlimited. | ||
# max_connections = 1024 | ||
## Maximum socket buffer size in bytes. | ||
## For stream sockets, once the buffer fills up, the sender will start backing up. | ||
## For datagram sockets, once the buffer fills up, metrics will start dropping. | ||
## Defaults to the OS default. | ||
# read_buffer_size = 65535 | ||
## Data format to consume. | ||
## Each data format has it's own unique set of configuration options, read | ||
## more about them here: | ||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md | ||
# data_format = "influx" | ||
` | ||
} | ||
|
||
func (sl *SocketListener) Gather(_ telegraf.Accumulator) error { | ||
return nil | ||
} | ||
|
||
func (sl *SocketListener) SetParser(parser parsers.Parser) { | ||
sl.Parser = parser | ||
} | ||
|
||
func (sl *SocketListener) Start(acc telegraf.Accumulator) error { | ||
sl.Accumulator = acc | ||
spl := strings.SplitN(sl.ServiceAddress, "://", 2) | ||
if len(spl) != 2 { | ||
return fmt.Errorf("invalid service address: %s", sl.ServiceAddress) | ||
} | ||
|
||
switch spl[0] { | ||
case "tcp", "tcp4", "tcp6", "unix", "unixpacket": | ||
l, err := net.Listen(spl[0], spl[1]) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if sl.ReadBufferSize > 0 { | ||
if srb, ok := l.(setReadBufferer); ok { | ||
srb.SetReadBuffer(sl.ReadBufferSize) | ||
} else { | ||
log.Printf("W! Unable to set read buffer on a %s socket", spl[0]) | ||
} | ||
} | ||
|
||
ssl := &streamSocketListener{ | ||
Listener: l, | ||
SocketListener: sl, | ||
} | ||
|
||
sl.Closer = ssl | ||
go ssl.listen() | ||
case "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram": | ||
pc, err := net.ListenPacket(spl[0], spl[1]) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if sl.ReadBufferSize > 0 { | ||
if srb, ok := pc.(setReadBufferer); ok { | ||
srb.SetReadBuffer(sl.ReadBufferSize) | ||
} else { | ||
log.Printf("W! Unable to set read buffer on a %s socket", spl[0]) | ||
} | ||
} | ||
|
||
psl := &packetSocketListener{ | ||
PacketConn: pc, | ||
SocketListener: sl, | ||
} | ||
|
||
sl.Closer = psl | ||
go psl.listen() | ||
default: | ||
return fmt.Errorf("unknown protocol '%s' in '%s'", spl[0], sl.ServiceAddress) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (sl *SocketListener) Stop() { | ||
if sl.Closer != nil { | ||
sl.Close() | ||
sl.Closer = nil | ||
} | ||
} | ||
|
||
func newSocketListener() *SocketListener { | ||
parser, _ := parsers.NewInfluxParser() | ||
|
||
return &SocketListener{ | ||
Parser: parser, | ||
} | ||
} | ||
|
||
func init() { | ||
inputs.Add("socket_listener", func() telegraf.Input { return newSocketListener() }) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
package socket_listener | ||
|
||
import ( | ||
"net" | ||
"os" | ||
"testing" | ||
"time" | ||
|
||
"github.com/influxdata/telegraf/testutil" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestSocketListener_tcp(t *testing.T) { | ||
sl := newSocketListener() | ||
sl.ServiceAddress = "tcp://127.0.0.1:0" | ||
|
||
acc := &testutil.Accumulator{} | ||
err := sl.Start(acc) | ||
require.NoError(t, err) | ||
|
||
client, err := net.Dial("tcp", sl.Closer.(net.Listener).Addr().String()) | ||
require.NoError(t, err) | ||
|
||
testSocketListener(t, sl, client) | ||
} | ||
|
||
func TestSocketListener_udp(t *testing.T) { | ||
sl := newSocketListener() | ||
sl.ServiceAddress = "udp://127.0.0.1:0" | ||
|
||
acc := &testutil.Accumulator{} | ||
err := sl.Start(acc) | ||
require.NoError(t, err) | ||
|
||
client, err := net.Dial("udp", sl.Closer.(net.PacketConn).LocalAddr().String()) | ||
require.NoError(t, err) | ||
|
||
testSocketListener(t, sl, client) | ||
} | ||
|
||
func TestSocketListener_unix(t *testing.T) { | ||
defer os.Remove("/tmp/telegraf_test.sock") | ||
sl := newSocketListener() | ||
sl.ServiceAddress = "unix:///tmp/telegraf_test.sock" | ||
|
||
acc := &testutil.Accumulator{} | ||
err := sl.Start(acc) | ||
require.NoError(t, err) | ||
|
||
client, err := net.Dial("unix", "/tmp/telegraf_test.sock") | ||
require.NoError(t, err) | ||
|
||
testSocketListener(t, sl, client) | ||
} | ||
|
||
func TestSocketListener_unixgram(t *testing.T) { | ||
defer os.Remove("/tmp/telegraf_test.sock") | ||
sl := newSocketListener() | ||
sl.ServiceAddress = "unixgram:///tmp/telegraf_test.sock" | ||
|
||
acc := &testutil.Accumulator{} | ||
err := sl.Start(acc) | ||
require.NoError(t, err) | ||
|
||
client, err := net.Dial("unixgram", "/tmp/telegraf_test.sock") | ||
require.NoError(t, err) | ||
|
||
testSocketListener(t, sl, client) | ||
} | ||
|
||
func testSocketListener(t *testing.T, sl *SocketListener, client net.Conn) { | ||
mstr12 := "test,foo=bar v=1i 123456789\ntest,foo=baz v=2i 123456790\n" | ||
mstr3 := "test,foo=zab v=3i 123456791" | ||
client.Write([]byte(mstr12)) | ||
client.Write([]byte(mstr3)) | ||
if _, ok := client.(net.Conn); ok { | ||
// stream connection. needs trailing newline to terminate mstr3 | ||
client.Write([]byte{'\n'}) | ||
} | ||
|
||
acc := sl.Accumulator.(*testutil.Accumulator) | ||
|
||
acc.Lock() | ||
if len(acc.Metrics) < 1 { | ||
acc.Wait() | ||
} | ||
require.True(t, len(acc.Metrics) >= 1) | ||
m := acc.Metrics[0] | ||
acc.Unlock() | ||
|
||
assert.Equal(t, "test", m.Measurement) | ||
assert.Equal(t, map[string]string{"foo": "bar"}, m.Tags) | ||
assert.Equal(t, map[string]interface{}{"v": int64(1)}, m.Fields) | ||
assert.True(t, time.Unix(0, 123456789).Equal(m.Time)) | ||
|
||
acc.Lock() | ||
if len(acc.Metrics) < 2 { | ||
acc.Wait() | ||
} | ||
require.True(t, len(acc.Metrics) >= 2) | ||
m = acc.Metrics[1] | ||
acc.Unlock() | ||
|
||
assert.Equal(t, "test", m.Measurement) | ||
assert.Equal(t, map[string]string{"foo": "baz"}, m.Tags) | ||
assert.Equal(t, map[string]interface{}{"v": int64(2)}, m.Fields) | ||
assert.True(t, time.Unix(0, 123456790).Equal(m.Time)) | ||
|
||
acc.Lock() | ||
if len(acc.Metrics) < 3 { | ||
acc.Wait() | ||
} | ||
require.True(t, len(acc.Metrics) >= 3) | ||
m = acc.Metrics[2] | ||
acc.Unlock() | ||
|
||
assert.Equal(t, "test", m.Measurement) | ||
assert.Equal(t, map[string]string{"foo": "zab"}, m.Tags) | ||
assert.Equal(t, map[string]interface{}{"v": int64(3)}, m.Fields) | ||
assert.True(t, time.Unix(0, 123456791).Equal(m.Time)) | ||
} |
Oops, something went wrong.