Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] Unix stream socket input source #17492

Merged
merged 17 commits into from
Apr 21, 2020
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Enhance `elasticsearch/deprecation` fileset to handle ECS-compatible logs emitted by Elasticsearch. {issue}17715[17715] {pull}17728[17728]
- Enhance `elasticsearch/slowlog` fileset to handle ECS-compatible logs emitted by Elasticsearch. {issue}17715[17715] {pull}17729[17729]
- Improve ECS categorization field mappings in misp module. {issue}16026[16026] {pull}17344[17344]
- Added Unix stream socket support as an input source and a syslog input source. {pull}17492[17492]

*Heartbeat*

Expand Down
37 changes: 37 additions & 0 deletions filebeat/docs/inputs/input-common-unix-options.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
//////////////////////////////////////////////////////////////////////////
//// This content is shared by Filebeat inputs that use the Unix inputsource
//// If you add IDs to sections, make sure you use attributes to create
//// unique IDs for each input that includes this file. Use the format:
//// [id="{beatname_lc}-input-{type}-option-name"]
//////////////////////////////////////////////////////////////////////////
[float]
[id="{beatname_lc}-input-{type}-unix-max-message-size"]
==== `max_message_size`

The maximum size of the message received over the socket. The default is `20MiB`.

[float]
[id="{beatname_lc}-input-{type}-unix-path"]
==== `path`

The path to the Unix socket that will receive event streams.

[float]
[id="{beatname_lc}-input-{type}-unix-line-delimiter"]
==== `line_delimiter`

Specify the characters used to split the incoming events. The default is '\n'.

[float]
[id="{beatname_lc}-input-{type}-unix-max-connections"]
==== `max_connections`

The at most number of connections to accept at any given point in time.

[float]
[id="{beatname_lc}-input-{type}-unix-timeout"]
==== `timeout`

The number of seconds of inactivity before a connection is closed. The default is `300s`.

See <<configuration-ssl>> for more information.
14 changes: 13 additions & 1 deletion filebeat/docs/inputs/input-syslog.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<titleabbrev>Syslog</titleabbrev>
++++

Use the `syslog` input to read events over TCP or UDP, this input will parse BSD (rfc3164)
Use the `syslog` input to read events over TCP, UDP, or a Unix stream socket, this input will parse BSD (rfc3164)
event and some variant.

Example configurations:
Expand All @@ -28,6 +28,14 @@ Example configurations:
host: "localhost:9000"
----

["source","yaml",subs="attributes"]
----
{beatname_lc}.inputs:
- type: syslog
protocol.unix:
path: "/path/to/syslog.sock"
----

==== Configuration options

The `syslog` input supports protocol specific configuration options plus the
Expand All @@ -41,6 +49,10 @@ include::../inputs/input-common-udp-options.asciidoc[]

include::../inputs/input-common-tcp-options.asciidoc[]

===== Protocol `unix`:

include::../inputs/input-common-unix-options.asciidoc[]

[id="{beatname_lc}-input-{type}-common-options"]
include::../inputs/input-common-options.asciidoc[]

Expand Down
1 change: 1 addition & 0 deletions filebeat/include/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 35 additions & 2 deletions filebeat/input/syslog/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ import (

"github.com/elastic/beats/v7/filebeat/harvester"
"github.com/elastic/beats/v7/filebeat/inputsource"
netcommon "github.com/elastic/beats/v7/filebeat/inputsource/common"
"github.com/elastic/beats/v7/filebeat/inputsource/tcp"
"github.com/elastic/beats/v7/filebeat/inputsource/udp"
"github.com/elastic/beats/v7/filebeat/inputsource/unix"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
)

type config struct {
Expand All @@ -54,6 +57,19 @@ var defaultTCP = syslogTCP{
LineDelimiter: "\n",
}

type syslogUnix struct {
unix.Config `config:",inline"`
LineDelimiter string `config:"line_delimiter" validate:"nonzero"`
}

var defaultUnix = syslogUnix{
Config: unix.Config{
Timeout: time.Minute * 5,
MaxMessageSize: 20 * humanize.MiByte,
},
LineDelimiter: "\n",
}

var defaultUDP = udp.Config{
MaxMessageSize: 10 * humanize.KiByte,
Timeout: time.Minute * 5,
Expand All @@ -72,14 +88,31 @@ func factory(
return nil, err
}

splitFunc := tcp.SplitFunc([]byte(config.LineDelimiter))
splitFunc := netcommon.SplitFunc([]byte(config.LineDelimiter))
if splitFunc == nil {
return nil, fmt.Errorf("error creating splitFunc from delimiter %s", config.LineDelimiter)
}

factory := tcp.SplitHandlerFactory(nf, splitFunc)
logger := logp.NewLogger("input.syslog.tcp").With("address", config.Config.Host)
factory := netcommon.SplitHandlerFactory(netcommon.FamilyTCP, logger, tcp.MetadataCallback, nf, splitFunc)

return tcp.New(&config.Config, factory)
case unix.Name:
config := defaultUnix
if err := cfg.Unpack(&config); err != nil {
return nil, err
}

splitFunc := netcommon.SplitFunc([]byte(config.LineDelimiter))
if splitFunc == nil {
return nil, fmt.Errorf("error creating splitFunc from delimiter %s", config.LineDelimiter)
}

logger := logp.NewLogger("input.syslog.unix").With("path", config.Config.Path)
factory := netcommon.SplitHandlerFactory(netcommon.FamilyUnix, logger, unix.MetadataCallback, nf, splitFunc)

return unix.New(&config.Config, factory)

case udp.Name:
config := defaultUDP
if err := cfg.Unpack(&config); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion filebeat/input/syslog/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,9 @@ func newBeatEvent(timestamp time.Time, metadata inputsource.NetworkMetadata, fie
},
Fields: fields,
}
event.Fields.Put("log.source.address", metadata.RemoteAddr.String())
if metadata.RemoteAddr != nil {
event.Fields.Put("log.source.address", metadata.RemoteAddr.String())
}
return event
}

Expand Down
18 changes: 10 additions & 8 deletions filebeat/input/tcp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/elastic/beats/v7/filebeat/harvester"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/filebeat/inputsource"
netcommon "github.com/elastic/beats/v7/filebeat/inputsource/common"
"github.com/elastic/beats/v7/filebeat/inputsource/tcp"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
Expand All @@ -41,7 +42,7 @@ func init() {

// Input for TCP connection
type Input struct {
sync.Mutex
mutex sync.Mutex
server *tcp.Server
started bool
outlet channel.Outleter
Expand Down Expand Up @@ -78,12 +79,13 @@ func NewInput(
forwarder.Send(event)
}

splitFunc := tcp.SplitFunc([]byte(config.LineDelimiter))
splitFunc := netcommon.SplitFunc([]byte(config.LineDelimiter))
if splitFunc == nil {
return nil, fmt.Errorf("unable to create splitFunc for delimiter %s", config.LineDelimiter)
}

factory := tcp.SplitHandlerFactory(cb, splitFunc)
logger := logp.NewLogger("input.tcp").With("address", config.Config.Host)
factory := netcommon.SplitHandlerFactory(netcommon.FamilyTCP, logger, tcp.MetadataCallback, cb, splitFunc)

server, err := tcp.New(&config.Config, factory)
if err != nil {
Expand All @@ -95,14 +97,14 @@ func NewInput(
started: false,
outlet: out,
config: &config,
log: logp.NewLogger("tcp input").With("address", config.Config.Host),
log: logger,
}, nil
}

// Run start a TCP input
func (p *Input) Run() {
p.Lock()
defer p.Unlock()
p.mutex.Lock()
defer p.mutex.Unlock()

if !p.started {
p.log.Info("Starting TCP input")
Expand All @@ -117,8 +119,8 @@ func (p *Input) Run() {
// Stop stops TCP server
func (p *Input) Stop() {
defer p.outlet.Close()
p.Lock()
defer p.Unlock()
p.mutex.Lock()
defer p.mutex.Unlock()

p.log.Info("Stopping TCP input")
p.server.Stop()
Expand Down
45 changes: 45 additions & 0 deletions filebeat/input/unix/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package unix

import (
"time"

"github.com/dustin/go-humanize"

"github.com/elastic/beats/v7/filebeat/harvester"
"github.com/elastic/beats/v7/filebeat/inputsource/unix"
)

type config struct {
unix.Config `config:",inline"`
harvester.ForwarderConfig `config:",inline"`

LineDelimiter string `config:"line_delimiter" validate:"nonzero"`
}

var defaultConfig = config{
ForwarderConfig: harvester.ForwarderConfig{
Type: "unix",
},
Config: unix.Config{
Timeout: time.Minute * 5,
MaxMessageSize: 20 * humanize.MiByte,
},
LineDelimiter: "\n",
}
Loading