diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 5257a31b9e9c..5f2b3eb6eef5 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -52,6 +52,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Remove the undefined `username` option from the Redis input and clarify the documentation. {pull}6662[6662]
- Add validation for Stdin, when Filebeat is configured with Stdin and any other inputs, Filebeat
will now refuses to start. {pull}6463[6463]
+- Addition of the TCP input {pull}6700[6700]
*Heartbeat*
diff --git a/filebeat/_meta/common.reference.p2.yml b/filebeat/_meta/common.reference.p2.yml
index 4e922d48a92d..91e66d013004 100644
--- a/filebeat/_meta/common.reference.p2.yml
+++ b/filebeat/_meta/common.reference.p2.yml
@@ -240,6 +240,23 @@ filebeat.inputs:
# Maximum size of the message received over UDP
#max_message_size: 10240
+#------------------------------ TCP prospector --------------------------------
+# Experimental: Config options for the TCP input
+#- type: tcp
+ #enabled: false
+
+ # The host and port to receive the new event
+ #host: "localhost:9000"
+
+ # Character used to split new message
+ #line_delimiter: "\n"
+
+ # Maximum size in bytes of the message received over TCP
+ #max_message_size: 20MiB
+
+ # The number of seconds of inactivity before a remote connection is closed.
+ #timeout: 300s
+
#========================== Filebeat autodiscover ==============================
# Autodiscover allows you to detect changes in the system and spawn new modules
diff --git a/filebeat/docs/filebeat-options.asciidoc b/filebeat/docs/filebeat-options.asciidoc
index 6b1ace6c3945..ba2109e7e33b 100644
--- a/filebeat/docs/filebeat-options.asciidoc
+++ b/filebeat/docs/filebeat-options.asciidoc
@@ -14,7 +14,7 @@ and configuring modules.
To configure {beatname_uc} manually (instead of using
<<{beatname_lc}-modules-overview,modules>>), you specify a list of inputs in the
+{beatname_lc}.inputs+ section of the +{beatname_lc}.yml+. Inputs specify how
-{beatname_uc} locates and processes input data.
+{beatname_uc} locates and processes input data.
The list is a http://yaml.org/[YAML] array, so each input begins with
a dash (`-`). You can specify multiple inputs, and you can specify the same
@@ -47,6 +47,7 @@ You can configure {beatname_uc} to use the following inputs:
* <<{beatname_lc}-input-redis>>
* <<{beatname_lc}-input-udp>>
* <<{beatname_lc}-input-docker>>
+* <<{beatname_lc}-input-tcp>>
@@ -59,3 +60,5 @@ include::inputs/input-redis.asciidoc[]
include::inputs/input-udp.asciidoc[]
include::inputs/input-docker.asciidoc[]
+
+include::inputs/input-tcp.asciidoc[]
diff --git a/filebeat/docs/inputs/input-tcp.asciidoc b/filebeat/docs/inputs/input-tcp.asciidoc
new file mode 100644
index 000000000000..685bee40661d
--- /dev/null
+++ b/filebeat/docs/inputs/input-tcp.asciidoc
@@ -0,0 +1,55 @@
+:type: tcp
+
+[id="{beatname_lc}-input-{type}"]
+=== TCP input
+
+++++
+TCP
+++++
+
+Use the `TCP` input to read events over TCP.
+
+Example configuration:
+
+["source","yaml",subs="attributes"]
+----
+{beatname_lc}.inputs:
+- type: tcp
+ max_message_size: 10240
+ host: "localhost:9000"
+----
+
+
+==== Configuration options
+
+The `tcp` input supports the following configuration options plus the
+<<{beatname_lc}-input-{type}-common-options>> described later.
+
+[float]
+[id="{beatname_lc}-input-{type}-max-message-size"]
+==== `max_message_size`
+
+The maximum size of the message received over TCP. The default is `20MiB`.
+
+[float]
+[id="{beatname_lc}-input-{type}-host"]
+==== `host`
+
+The host and TCP port to listen on for event streams.
+
+[float]
+[id="{beatname_lc}-input-{type}-line-delimiter"]
+==== `line_delimiter`
+
+Specify the characters used to split the incoming events. The default is '\n'.
+
+[float]
+[id="{beatname_lc}-input-{type}-timeout"]
+==== `timeout`
+
+The number of seconds of inactivity before a remote connection is closed. The default is `300s`.
+
+[id="{beatname_lc}-input-{type}-common-options"]
+include::../inputs/input-common-options.asciidoc[]
+
+:type!:
diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml
index 664dcea51be9..15506ec236b5 100644
--- a/filebeat/filebeat.reference.yml
+++ b/filebeat/filebeat.reference.yml
@@ -535,6 +535,23 @@ filebeat.inputs:
# Maximum size of the message received over UDP
#max_message_size: 10240
+#------------------------------ TCP prospector --------------------------------
+# Experimental: Config options for the TCP input
+#- type: tcp
+ #enabled: false
+
+ # The host and port to receive the new event
+ #host: "localhost:9000"
+
+ # Character used to split new message
+ #line_delimiter: "\n"
+
+ # Maximum size in bytes of the message received over TCP
+ #max_message_size: 20MiB
+
+ # The number of seconds of inactivity before a remote connection is closed.
+ #timeout: 300s
+
#========================== Filebeat autodiscover ==============================
# Autodiscover allows you to detect changes in the system and spawn new modules
diff --git a/filebeat/include/list.go b/filebeat/include/list.go
index e1aae4e763ac..374be396eced 100644
--- a/filebeat/include/list.go
+++ b/filebeat/include/list.go
@@ -12,5 +12,6 @@ import (
_ "github.com/elastic/beats/filebeat/input/log"
_ "github.com/elastic/beats/filebeat/input/redis"
_ "github.com/elastic/beats/filebeat/input/stdin"
+ _ "github.com/elastic/beats/filebeat/input/tcp"
_ "github.com/elastic/beats/filebeat/input/udp"
)
diff --git a/filebeat/input/tcp/config.go b/filebeat/input/tcp/config.go
new file mode 100644
index 000000000000..7b30b2a96402
--- /dev/null
+++ b/filebeat/input/tcp/config.go
@@ -0,0 +1,26 @@
+package tcp
+
+import (
+ "time"
+
+ "github.com/dustin/go-humanize"
+
+ "github.com/elastic/beats/filebeat/harvester"
+ "github.com/elastic/beats/filebeat/inputsource/tcp"
+)
+
+type config struct {
+ tcp.Config `config:",inline"`
+ harvester.ForwarderConfig `config:",inline"`
+}
+
+var defaultConfig = config{
+ ForwarderConfig: harvester.ForwarderConfig{
+ Type: "tcp",
+ },
+ Config: tcp.Config{
+ LineDelimiter: "\n",
+ Timeout: time.Minute * 5,
+ MaxMessageSize: 20 * humanize.MiByte,
+ },
+}
diff --git a/filebeat/input/tcp/input.go b/filebeat/input/tcp/input.go
new file mode 100644
index 000000000000..2153d4154a0e
--- /dev/null
+++ b/filebeat/input/tcp/input.go
@@ -0,0 +1,109 @@
+package tcp
+
+import (
+ "time"
+
+ "github.com/elastic/beats/filebeat/channel"
+ "github.com/elastic/beats/filebeat/harvester"
+ "github.com/elastic/beats/filebeat/input"
+ "github.com/elastic/beats/filebeat/inputsource/tcp"
+ "github.com/elastic/beats/filebeat/util"
+ "github.com/elastic/beats/libbeat/beat"
+ "github.com/elastic/beats/libbeat/common"
+ "github.com/elastic/beats/libbeat/common/atomic"
+ "github.com/elastic/beats/libbeat/common/cfgwarn"
+ "github.com/elastic/beats/libbeat/logp"
+)
+
+func init() {
+ err := input.Register("tcp", NewInput)
+ if err != nil {
+ panic(err)
+ }
+}
+
+// Input for TCP connection
+type Input struct {
+ server *tcp.Server
+ started atomic.Bool
+ outlet channel.Outleter
+ config *config
+ log *logp.Logger
+}
+
+// NewInput creates a new TCP input
+func NewInput(
+ cfg *common.Config,
+ outlet channel.Factory,
+ context input.Context,
+) (input.Input, error) {
+ cfgwarn.Experimental("TCP input type is used")
+
+ out, err := outlet(cfg, context.DynamicFields)
+ if err != nil {
+ return nil, err
+ }
+
+ forwarder := harvester.NewForwarder(out)
+
+ config := defaultConfig
+ err = cfg.Unpack(&config)
+ if err != nil {
+ return nil, err
+ }
+
+ cb := func(data []byte, metadata tcp.Metadata) {
+ event := createEvent(data, metadata)
+ forwarder.Send(event)
+ }
+
+ server, err := tcp.New(cb, &config.Config)
+ if err != nil {
+ return nil, err
+ }
+
+ return &Input{
+ server: server,
+ started: atomic.MakeBool(false),
+ outlet: out,
+ config: &config,
+ log: logp.NewLogger("tcp input").With(config.Config.Host),
+ }, nil
+}
+
+// Run start a TCP input
+func (p *Input) Run() {
+ if !p.started.Load() {
+ p.log.Info("Starting TCP input")
+ err := p.server.Start()
+ if err != nil {
+ p.log.Errorw("Error starting the TCP server", "error", err)
+ }
+ p.started.Swap(true)
+ }
+}
+
+// Stop stops TCP server
+func (p *Input) Stop() {
+ p.log.Info("Stopping TCP input")
+ defer p.outlet.Close()
+ defer p.started.Swap(false)
+ p.server.Stop()
+}
+
+// Wait stop the current server
+func (p *Input) Wait() {
+ p.Stop()
+}
+
+func createEvent(raw []byte, metadata tcp.Metadata) *util.Data {
+ data := util.NewData()
+ data.Event = beat.Event{
+ Timestamp: time.Now(),
+ Fields: common.MapStr{
+ "message": string(raw),
+ "source": metadata.RemoteAddr.String(),
+ },
+ }
+ return data
+}
diff --git a/filebeat/input/tcp/input_test.go b/filebeat/input/tcp/input_test.go
new file mode 100644
index 000000000000..80a32f7a9e50
--- /dev/null
+++ b/filebeat/input/tcp/input_test.go
@@ -0,0 +1,30 @@
+package tcp
+
+import (
+ "net"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/elastic/beats/filebeat/inputsource/tcp"
+)
+
+func TestCreateEvent(t *testing.T) {
+ hello := "hello world"
+ ip := "127.0.0.1"
+ parsedIP := net.ParseIP(ip)
+ addr := &net.IPAddr{IP: parsedIP, Zone: ""}
+
+ message := []byte(hello)
+ mt := tcp.Metadata{RemoteAddr: addr}
+
+ data := createEvent(message, mt)
+ event := data.GetEvent()
+
+ m, err := event.GetValue("message")
+ assert.NoError(t, err)
+ assert.Equal(t, string(message), m)
+
+ from, _ := event.GetValue("source")
+ assert.Equal(t, ip, from)
+}
diff --git a/filebeat/inputsource/tcp/client.go b/filebeat/inputsource/tcp/client.go
new file mode 100644
index 000000000000..b9c4528a1d27
--- /dev/null
+++ b/filebeat/inputsource/tcp/client.go
@@ -0,0 +1,78 @@
+package tcp
+
+import (
+ "bufio"
+ "net"
+ "time"
+
+ "github.com/pkg/errors"
+
+ "github.com/elastic/beats/libbeat/logp"
+)
+
+// Client is a remote client.
+type client struct {
+ conn net.Conn
+ log *logp.Logger
+ callback CallbackFunc
+ done chan struct{}
+ metadata Metadata
+ splitFunc bufio.SplitFunc
+ maxReadMessage size
+ timeout time.Duration
+}
+
+func newClient(
+ conn net.Conn,
+ log *logp.Logger,
+ callback CallbackFunc,
+ splitFunc bufio.SplitFunc,
+ maxReadMessage size,
+ timeout time.Duration,
+) *client {
+ client := &client{
+ conn: conn,
+ log: log.With("address", conn.RemoteAddr()),
+ callback: callback,
+ done: make(chan struct{}),
+ splitFunc: splitFunc,
+ maxReadMessage: maxReadMessage,
+ timeout: timeout,
+ metadata: Metadata{
+ RemoteAddr: conn.RemoteAddr(),
+ },
+ }
+ return client
+}
+
+func (c *client) handle() error {
+ r := NewResetableLimitedReader(NewDeadlineReader(c.conn, c.timeout), uint64(c.maxReadMessage))
+ buf := bufio.NewReader(r)
+ scanner := bufio.NewScanner(buf)
+ scanner.Split(c.splitFunc)
+
+ for scanner.Scan() {
+ err := scanner.Err()
+ if err != nil {
+ // we are forcing a close on the socket, lets ignore any error that could happen.
+ select {
+ case <-c.done:
+ break
+ default:
+ }
+ // This is a user defined limit and we should notify the user.
+ if IsMaxReadBufferErr(err) {
+ c.log.Errorw("client errors", "error", err)
+ }
+ return errors.Wrap(err, "tcp client error")
+ }
+ r.Reset()
+ c.callback(scanner.Bytes(), c.metadata)
+ }
+ return nil
+}
+
+func (c *client) close() {
+ close(c.done)
+ c.conn.Close()
+}
diff --git a/filebeat/inputsource/tcp/config.go b/filebeat/inputsource/tcp/config.go
new file mode 100644
index 000000000000..c4164dac8c55
--- /dev/null
+++ b/filebeat/inputsource/tcp/config.go
@@ -0,0 +1,36 @@
+package tcp
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/dustin/go-humanize"
+)
+
+type size uint64
+
+// Config exposes the tcp configuration.
+type Config struct {
+ Host string `config:"host"`
+ LineDelimiter string `config:"line_delimiter" validate:"nonzero"`
+ Timeout time.Duration `config:"timeout" validate:"nonzero,positive"`
+ MaxMessageSize size `config:"max_message_size" validate:"nonzero,positive"`
+}
+
+// Validate validates the Config option for the tcp input.
+func (c *Config) Validate() error {
+ if len(c.Host) == 0 {
+ return fmt.Errorf("need to specify the host using the `host:port` syntax")
+ }
+ return nil
+}
+
+func (s *size) Unpack(value string) error {
+ sz, err := humanize.ParseBytes(value)
+ if err != nil {
+ return err
+ }
+
+ *s = size(sz)
+ return nil
+}
diff --git a/filebeat/inputsource/tcp/conn.go b/filebeat/inputsource/tcp/conn.go
new file mode 100644
index 000000000000..a2a40bb02f0e
--- /dev/null
+++ b/filebeat/inputsource/tcp/conn.go
@@ -0,0 +1,72 @@
+package tcp
+
+import (
+ "io"
+ "net"
+ "time"
+
+ "github.com/pkg/errors"
+)
+
+// ErrMaxReadBuffer returns when too many bytes was read on the io.Reader
+var ErrMaxReadBuffer = errors.New("max read buffer reached")
+
+// ResetableLimitedReader is based on LimitedReader but allow to reset the byte read and return a specific
+// error when we reach the limit.
+type ResetableLimitedReader struct {
+ reader io.Reader
+ maxReadBuffer uint64
+ byteRead uint64
+}
+
+// NewResetableLimitedReader returns a new ResetableLimitedReader
+func NewResetableLimitedReader(reader io.Reader, maxReadBuffer uint64) *ResetableLimitedReader {
+ return &ResetableLimitedReader{
+ reader: reader,
+ maxReadBuffer: maxReadBuffer,
+ }
+}
+
+// Read reads the specified amount of byte
+func (m *ResetableLimitedReader) Read(p []byte) (n int, err error) {
+ if m.byteRead >= m.maxReadBuffer {
+ return 0, ErrMaxReadBuffer
+ }
+ n, err = m.reader.Read(p)
+ m.byteRead += uint64(n)
+ return
+}
+
+// Reset resets the number of byte read
+func (m *ResetableLimitedReader) Reset() {
+ m.byteRead = 0
+}
+
+// IsMaxReadBufferErr returns true when the error is ErrMaxReadBuffer
+func IsMaxReadBufferErr(err error) bool {
+ return err == ErrMaxReadBuffer
+}
+
+// DeadlineReader allow read to a io.Reader to timeout, the timeout is refreshed on every read.
+type DeadlineReader struct {
+ conn net.Conn
+ timeout time.Duration
+}
+
+// NewDeadlineReader returns a new DeadlineReader
+func NewDeadlineReader(c net.Conn, timeout time.Duration) *DeadlineReader {
+ return &DeadlineReader{
+ conn: c,
+ timeout: timeout,
+ }
+}
+
+// Read reads the number of bytes from the reader
+func (d *DeadlineReader) Read(p []byte) (n int, err error) {
+ d.refresh()
+ return d.conn.Read(p)
+}
+
+func (d *DeadlineReader) refresh() {
+ d.conn.SetDeadline(time.Now().Add(d.timeout))
+}
diff --git a/filebeat/inputsource/tcp/conn_test.go b/filebeat/inputsource/tcp/conn_test.go
new file mode 100644
index 000000000000..aaf2bea6a16a
--- /dev/null
+++ b/filebeat/inputsource/tcp/conn_test.go
@@ -0,0 +1,43 @@
+package tcp
+
+import (
+ "strings"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestResetableLimitedReader(t *testing.T) {
+ maxReadBuffer := 400
+
+ t.Run("WhenMaxReadIsReachedInMultipleRead", func(t *testing.T) {
+ r := strings.NewReader(randomString(maxReadBuffer * 2))
+ m := NewResetableLimitedReader(r, uint64(maxReadBuffer))
+ toRead := make([]byte, maxReadBuffer)
+ _, err := m.Read(toRead)
+ assert.NoError(t, err)
+ toRead = make([]byte, 300)
+ _, err = m.Read(toRead)
+ assert.Equal(t, ErrMaxReadBuffer, err)
+ })
+
+ t.Run("WhenMaxReadIsNotReached", func(t *testing.T) {
+ r := strings.NewReader(randomString(maxReadBuffer * 2))
+ m := NewResetableLimitedReader(r, uint64(maxReadBuffer))
+ toRead := make([]byte, maxReadBuffer)
+ _, err := m.Read(toRead)
+ assert.NoError(t, err)
+ })
+
+ t.Run("WhenResetIsCalled", func(t *testing.T) {
+ r := strings.NewReader(randomString(maxReadBuffer * 2))
+ m := NewResetableLimitedReader(r, uint64(maxReadBuffer))
+ toRead := make([]byte, maxReadBuffer)
+ _, err := m.Read(toRead)
+ assert.NoError(t, err)
+ m.Reset()
+ toRead = make([]byte, 300)
+ _, err = m.Read(toRead)
+ assert.NoError(t, err)
+ })
+}
diff --git a/filebeat/inputsource/tcp/scan.go b/filebeat/inputsource/tcp/scan.go
new file mode 100644
index 000000000000..4e9481581d27
--- /dev/null
+++ b/filebeat/inputsource/tcp/scan.go
@@ -0,0 +1,31 @@
+package tcp
+
+import (
+ "bufio"
+ "bytes"
+)
+
+// factoryDelimiter return a function to split line using a custom delimiter supporting multibytes
+// delimiter, the delimiter is stripped from the returned value.
+func factoryDelimiter(delimiter []byte) bufio.SplitFunc {
+ return func(data []byte, eof bool) (int, []byte, error) {
+ if eof && len(data) == 0 {
+ return 0, nil, nil
+ }
+ if i := bytes.Index(data, delimiter); i >= 0 {
+ return i + len(delimiter), dropDelimiter(data[0:i], delimiter), nil
+ }
+ if eof {
+ return len(data), dropDelimiter(data, delimiter), nil
+ }
+ return 0, nil, nil
+ }
+}
+
+func dropDelimiter(data []byte, delimiter []byte) []byte {
+ if len(data) > len(delimiter) &&
+ bytes.Equal(data[len(data)-len(delimiter):len(data)], delimiter) {
+ return data[0 : len(data)-len(delimiter)]
+ }
+ return data
+}
diff --git a/filebeat/inputsource/tcp/scan_test.go b/filebeat/inputsource/tcp/scan_test.go
new file mode 100644
index 000000000000..87b28431965d
--- /dev/null
+++ b/filebeat/inputsource/tcp/scan_test.go
@@ -0,0 +1,91 @@
+package tcp
+
+import (
+ "bufio"
+ "strings"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestCustomDelimiter(t *testing.T) {
+ tests := []struct {
+ name string
+ text string
+ expected []string
+ delimiter []byte
+ }{
+ {
+ name: "Multiple chars delimiter",
+ text: "hellobonjourholahey",
+ expected: []string{
+ "hello",
+ "bonjour",
+ "hola",
+ "hey",
+ },
+ delimiter: []byte(""),
+ },
+ {
+ name: "Multiple chars delimiter with half starting delimiter",
+ text: "hellobonjourhey",
+ expected: []string{
+ "hello",
+ "bonjour"),
+ },
+ {
+ name: "Multiple chars delimiter with half ending delimiter",
+ text: "helloEND>holahey",
+ expected: []string{
+ "hello",
+ "END>hola",
+ "hey",
+ },
+ delimiter: []byte(""),
+ },
+ {
+ name: "Delimiter end of string",
+ text: "hellobonjourholahey",
+ expected: []string{
+ "hello",
+ "bonjour",
+ "hola",
+ "hey",
+ },
+ delimiter: []byte(""),
+ },
+ {
+ name: "Single char delimiter",
+ text: "hello;bonjour;hola;hey",
+ expected: []string{
+ "hello",
+ "bonjour",
+ "hola",
+ "hey",
+ },
+ delimiter: []byte(";"),
+ },
+ {
+ name: "Empty string",
+ text: "",
+ expected: []string(nil),
+ delimiter: []byte(";"),
+ },
+ }
+
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ buf := strings.NewReader(test.text)
+ scanner := bufio.NewScanner(buf)
+ scanner.Split(factoryDelimiter(test.delimiter))
+ var elements []string
+ for scanner.Scan() {
+ elements = append(elements, scanner.Text())
+ }
+ assert.EqualValues(t, test.expected, elements)
+ })
+ }
+}
diff --git a/filebeat/inputsource/tcp/server.go b/filebeat/inputsource/tcp/server.go
new file mode 100644
index 000000000000..108f04baa3ae
--- /dev/null
+++ b/filebeat/inputsource/tcp/server.go
@@ -0,0 +1,167 @@
+package tcp
+
+import (
+ "bufio"
+ "bytes"
+ "fmt"
+ "net"
+ "sync"
+
+ "github.com/elastic/beats/libbeat/logp"
+)
+
+// Metadata information about the remote host.
+type Metadata struct {
+ RemoteAddr net.Addr
+}
+
+// CallbackFunc receives new events read from the TCP socket.
+type CallbackFunc = func(data []byte, metadata Metadata)
+
+// Server represent a TCP server
+type Server struct {
+ sync.RWMutex
+ callback CallbackFunc
+ config *Config
+ Listener net.Listener
+ clients map[*client]struct{}
+ wg sync.WaitGroup
+ done chan struct{}
+ splitFunc bufio.SplitFunc
+ log *logp.Logger
+}
+
+// New creates a new tcp server
+func New(
+ callback CallbackFunc,
+ config *Config,
+) (*Server, error) {
+
+ if len(config.LineDelimiter) == 0 {
+ return nil, fmt.Errorf("empty line delimiter")
+ }
+
+ sf := splitFunc([]byte(config.LineDelimiter))
+ return &Server{
+ config: config,
+ callback: callback,
+ clients: make(map[*client]struct{}, 0),
+ done: make(chan struct{}),
+ splitFunc: sf,
+ log: logp.NewLogger("tcp").With("address", config.Host),
+ }, nil
+}
+
+// Start listen to the TCP socket.
+func (s *Server) Start() error {
+ var err error
+ s.Listener, err = net.Listen("tcp", s.config.Host)
+ if err != nil {
+ return err
+ }
+
+ s.log.Info("Started listening for TCP connection")
+
+ s.wg.Add(1)
+ go func() {
+ defer s.wg.Done()
+ s.run()
+ }()
+ return nil
+}
+
+// Run start and run a new TCP listener to receive new data
+func (s *Server) run() {
+ for {
+ conn, err := s.Listener.Accept()
+ if err != nil {
+ select {
+ case <-s.done:
+ return
+ default:
+ s.log.Debugw("Can not accept the connection", "error", err)
+ continue
+ }
+ }
+
+ client := newClient(
+ conn,
+ s.log,
+ s.callback,
+ s.splitFunc,
+ s.config.MaxMessageSize,
+ s.config.Timeout,
+ )
+
+ s.log.Debugw("New client", "address", conn.RemoteAddr(), "total", s.clientsCount())
+ s.wg.Add(1)
+ go func() {
+ defer logp.Recover("recovering from a tcp client crash")
+ defer s.wg.Done()
+ defer conn.Close()
+
+ s.registerClient(client)
+ defer s.unregisterClient(client)
+
+ err := client.handle()
+ if err != nil {
+ s.log.Debugw("Client error", "error", err)
+ }
+
+ s.log.Debugw("Client disconnected", "address", conn.RemoteAddr(), "total", s.clientsCount())
+ }()
+ }
+}
+
+// Stop stops accepting new incoming TCP connection and close any active clients
+func (s *Server) Stop() {
+ s.log.Info("Stopping TCP server")
+ close(s.done)
+ s.Listener.Close()
+ for _, client := range s.allClients() {
+ client.close()
+ }
+ s.wg.Wait()
+ s.log.Info("TCP server stopped")
+}
+
+func (s *Server) registerClient(client *client) {
+ s.Lock()
+ defer s.Unlock()
+ s.clients[client] = struct{}{}
+}
+
+func (s *Server) unregisterClient(client *client) {
+ s.Lock()
+ defer s.Unlock()
+ delete(s.clients, client)
+}
+
+func (s *Server) allClients() []*client {
+ s.RLock()
+ defer s.RUnlock()
+ currentClients := make([]*client, len(s.clients))
+ idx := 0
+ for client := range s.clients {
+ currentClients[idx] = client
+ idx++
+ }
+ return currentClients
+}
+
+func (s *Server) clientsCount() int {
+ s.RLock()
+ defer s.RUnlock()
+ return len(s.clients)
+}
+
+func splitFunc(lineDelimiter []byte) bufio.SplitFunc {
+ ld := []byte(lineDelimiter)
+ if bytes.Equal(ld, []byte("\n")) {
+ // This will work for most usecases and will also strip \r if present.
+ // CustomDelimiter, need to match completely and the delimiter will be completely removed from
+ // the returned byte slice
+ return bufio.ScanLines
+ }
+ return factoryDelimiter(ld)
+}
diff --git a/filebeat/inputsource/tcp/server_test.go b/filebeat/inputsource/tcp/server_test.go
new file mode 100644
index 000000000000..7aaf200ece8c
--- /dev/null
+++ b/filebeat/inputsource/tcp/server_test.go
@@ -0,0 +1,244 @@
+package tcp
+
+import (
+ "fmt"
+ "math/rand"
+ "net"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/dustin/go-humanize"
+ "github.com/stretchr/testify/assert"
+
+ "github.com/elastic/beats/libbeat/common"
+)
+
+var defaultConfig = Config{
+ LineDelimiter: "\n",
+ Timeout: time.Minute * 5,
+ MaxMessageSize: 20 * humanize.MiByte,
+}
+
+type info struct {
+ message string
+ mt Metadata
+}
+
+func TestErrorOnEmptyLineDelimiter(t *testing.T) {
+ cfg := map[string]interface{}{
+ "line_delimiter": "",
+ }
+
+ c, _ := common.NewConfigFrom(cfg)
+ config := defaultConfig
+ err := c.Unpack(&config)
+ assert.Error(t, err)
+}
+
+func TestReceiveEventsAndMetadata(t *testing.T) {
+ expectedMessages := generateMessages(5, 100)
+ largeMessages := generateMessages(10, 4096)
+
+ tests := []struct {
+ name string
+ cfg map[string]interface{}
+ expectedMessages []string
+ messageSent string
+ }{
+ {
+ name: "NewLine",
+ cfg: map[string]interface{}{},
+ expectedMessages: expectedMessages,
+ messageSent: strings.Join(expectedMessages, "\n"),
+ },
+ {
+ name: "NewLineWithCR",
+ cfg: map[string]interface{}{},
+ expectedMessages: expectedMessages,
+ messageSent: strings.Join(expectedMessages, "\r\n"),
+ },
+ {
+ name: "CustomDelimiter",
+ cfg: map[string]interface{}{
+ "line_delimiter": ";",
+ },
+ expectedMessages: expectedMessages,
+ messageSent: strings.Join(expectedMessages, ";"),
+ },
+ {
+ name: "MultipleCharsCustomDelimiter",
+ cfg: map[string]interface{}{
+ "line_delimiter": "",
+ },
+ expectedMessages: expectedMessages,
+ messageSent: strings.Join(expectedMessages, ""),
+ },
+ {
+ name: "SingleCharCustomDelimiterMessageWithoutBoudaries",
+ cfg: map[string]interface{}{
+ "line_delimiter": ";",
+ },
+ expectedMessages: []string{"hello"},
+ messageSent: "hello",
+ },
+ {
+ name: "MultipleCharCustomDelimiterMessageWithoutBoundaries",
+ cfg: map[string]interface{}{
+ "line_delimiter": "",
+ },
+ expectedMessages: []string{"hello"},
+ messageSent: "hello",
+ },
+ {
+ name: "NewLineMessageWithoutBoundaries",
+ cfg: map[string]interface{}{
+ "line_delimiter": "\n",
+ },
+ expectedMessages: []string{"hello"},
+ messageSent: "hello",
+ },
+ {
+ name: "NewLineLargeMessagePayload",
+ cfg: map[string]interface{}{
+ "line_delimiter": "\n",
+ },
+ expectedMessages: largeMessages,
+ messageSent: strings.Join(largeMessages, "\n"),
+ },
+ {
+ name: "CustomLargeMessagePayload",
+ cfg: map[string]interface{}{
+ "line_delimiter": ";",
+ },
+ expectedMessages: largeMessages,
+ messageSent: strings.Join(largeMessages, ";"),
+ },
+ {
+ name: "MaxReadBufferReached",
+ cfg: map[string]interface{}{},
+ expectedMessages: []string{},
+ messageSent: randomString(900000),
+ },
+ {
+ name: "MaxReadBufferReachedUserConfigured",
+ cfg: map[string]interface{}{
+ "max_read_message": 50000,
+ },
+ expectedMessages: []string{},
+ messageSent: randomString(600000),
+ },
+ }
+
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ ch := make(chan *info, len(test.expectedMessages))
+ defer close(ch)
+ to := func(message []byte, mt Metadata) {
+ ch <- &info{message: string(message), mt: mt}
+ }
+ test.cfg["host"] = "localhost:0"
+ cfg, _ := common.NewConfigFrom(test.cfg)
+ config := defaultConfig
+ err := cfg.Unpack(&config)
+ if !assert.NoError(t, err) {
+ return
+ }
+ server, err := New(to, &config)
+ if !assert.NoError(t, err) {
+ return
+ }
+ err = server.Start()
+ if !assert.NoError(t, err) {
+ return
+ }
+ defer server.Stop()
+
+ conn, err := net.Dial("tcp", server.Listener.Addr().String())
+ assert.NoError(t, err)
+ fmt.Fprint(conn, test.messageSent)
+ conn.Close()
+
+ var events []*info
+
+ for len(events) < len(test.expectedMessages) {
+ select {
+ case event := <-ch:
+ events = append(events, event)
+ default:
+ }
+ }
+
+ for idx, e := range events {
+ assert.Equal(t, test.expectedMessages[idx], e.message)
+ assert.NotNil(t, e.mt.RemoteAddr)
+ }
+ })
+ }
+}
+
+func TestReceiveNewEventsConcurrently(t *testing.T) {
+ workers := 4
+ eventsCount := 100
+ ch := make(chan *info, eventsCount*workers)
+ defer close(ch)
+ to := func(message []byte, mt Metadata) {
+ ch <- &info{message: string(message), mt: mt}
+ }
+ cfg, err := common.NewConfigFrom(map[string]interface{}{"host": ":0"})
+ if !assert.NoError(t, err) {
+ return
+ }
+ config := defaultConfig
+ err = cfg.Unpack(&config)
+ if !assert.NoError(t, err) {
+ return
+ }
+ server, err := New(to, &config)
+ if !assert.NoError(t, err) {
+ return
+ }
+ err = server.Start()
+ if !assert.NoError(t, err) {
+ return
+ }
+ defer server.Stop()
+
+ samples := generateMessages(eventsCount, 1024)
+ for w := 0; w < workers; w++ {
+ go func() {
+ conn, err := net.Dial("tcp", server.Listener.Addr().String())
+ defer conn.Close()
+ assert.NoError(t, err)
+ for _, sample := range samples {
+ fmt.Fprintln(conn, sample)
+ }
+ }()
+ }
+
+ var events []*info
+ for len(events) < eventsCount*workers {
+ select {
+ case event := <-ch:
+ events = append(events, event)
+ default:
+ }
+ }
+}
+
+func randomString(l int) string {
+ charsets := []byte("abcdefghijklmnopqrstuvwzyzABCDEFGHIJKLMNOPQRSTUVWZYZ0123456789")
+ message := make([]byte, l)
+ for i := range message {
+ message[i] = charsets[rand.Intn(len(charsets))]
+ }
+ return string(message)
+}
+
+func generateMessages(c int, l int) []string {
+ messages := make([]string, c)
+ for i := range messages {
+ messages[i] = randomString(l)
+ }
+ return messages
+}
diff --git a/filebeat/tests/system/test_tcp.py b/filebeat/tests/system/test_tcp.py
new file mode 100644
index 000000000000..d6788d164eac
--- /dev/null
+++ b/filebeat/tests/system/test_tcp.py
@@ -0,0 +1,68 @@
+from filebeat import BaseTest
+import socket
+
+
+class Test(BaseTest):
+ """
+ Test filebeat TCP input
+ """
+
+ def test_tcp_with_newline_delimiter(self):
+ """
+ Test TCP input with a new line delimiter
+ """
+ self.send_events_with_delimiter("\n")
+
+ def test_tcp_with_custom_char_delimiter(self):
+ """
+ Test TCP input with a custom single char delimiter
+ """
+ self.send_events_with_delimiter(";")
+
+ def test_tcp_with_custom_word_delimiter(self):
+ """
+ Test TCP input with a custom single char delimiter
+ """
+ self.send_events_with_delimiter("")
+
+ def send_events_with_delimiter(self, delimiter):
+ host = "127.0.0.1"
+ port = 8080
+ input_raw = """
+- type: tcp
+ host: "{}:{}"
+ enabled: true
+"""
+
+ # Use default of \n and stripping \r
+ if delimiter is not "":
+ input_raw += "\n line_delimiter: {}".format(delimiter)
+
+ input_raw = input_raw.format(host, port)
+
+ self.render_config_template(
+ input_raw=input_raw,
+ inputs=False,
+ )
+
+ filebeat = self.start_beat()
+
+ self.wait_until(lambda: self.log_contains("Started listening for TCP connection"))
+
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # TCP
+ sock.connect((host, port))
+
+ for n in range(0, 2):
+ sock.send("Hello World: " + str(n) + delimiter)
+
+ self.wait_until(lambda: self.output_count(lambda x: x >= 2))
+
+ filebeat.check_kill_and_wait()
+
+ output = self.read_output()
+
+ assert len(output) == 2
+ assert output[0]["prospector.type"] == "tcp"
+ assert output[0]["input.type"] == "tcp"
+
+ sock.close()