Skip to content

Commit

Permalink
[Filebeat] Update docs for unix sockets (#18009)
Browse files Browse the repository at this point in the history
* update docs for unix sockets

* Update filebeat/docs/inputs/input-unix.asciidoc

Co-Authored-By: Andrew Kroh <[email protected]>

* Add socket cleanup code, and socket ownership modification

* rearrange imports

* updated docs

* Switch to group chown and chmod only

* Fix docs

* Bypass refusal check for windows due to Windows unix socket buf for FileMode

Co-authored-by: Andrew Kroh <[email protected]>
  • Loading branch information
Andrew Stucki and andrewkroh authored Apr 28, 2020
1 parent 3df0466 commit 0524005
Show file tree
Hide file tree
Showing 10 changed files with 236 additions and 0 deletions.
16 changes: 16 additions & 0 deletions filebeat/docs/inputs/input-common-unix-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,22 @@ The maximum size of the message received over the socket. The default is `20MiB`

The path to the Unix socket that will receive event streams.

[float]
[id="{beatname_lc}-input-{type}-unix-group"]
==== `group`

The group ownership of the Unix socket that will be created by Filebeat.
The default is the primary group name for the user Filebeat is running as.
This option is ignored on Windows.

[float]
[id="{beatname_lc}-input-{type}-unix-mode"]
==== `mode`

The file mode of the Unix socket that will be created by Filebeat. This is
expected to be a file mode as an octal string. The default value is the system
default (generally `0755`).

[float]
[id="{beatname_lc}-input-{type}-unix-line-delimiter"]
==== `line_delimiter`
Expand Down
2 changes: 2 additions & 0 deletions filebeat/docs/inputs/input-syslog.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ include::../inputs/input-common-tcp-options.asciidoc[]

===== Protocol `unix`:

beta[]

include::../inputs/input-common-unix-options.asciidoc[]

[id="{beatname_lc}-input-{type}-common-options"]
Expand Down
35 changes: 35 additions & 0 deletions filebeat/docs/inputs/input-unix.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
:type: unix

[id="{beatname_lc}-input-{type}"]
=== Unix input

beta[]

++++
<titleabbrev>Unix</titleabbrev>
++++

Use the `unix` input to read events over a stream-oriented Unix domain socket.

Example configuration:

["source","yaml",subs="attributes"]
----
{beatname_lc}.inputs:
- type: unix
max_message_size: 10MiB
path: "/var/run/filebeat.sock"
----


==== Configuration options

The `unix` input supports the following configuration options plus the
<<{beatname_lc}-input-{type}-common-options>> described later.

include::../inputs/input-common-unix-options.asciidoc[]

[id="{beatname_lc}-input-{type}-common-options"]
include::../inputs/input-common-options.asciidoc[]

:type!:
3 changes: 3 additions & 0 deletions filebeat/input/syslog/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/elastic/beats/v7/filebeat/inputsource/udp"
"github.com/elastic/beats/v7/filebeat/inputsource/unix"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/logp"
)

Expand Down Expand Up @@ -98,6 +99,8 @@ func factory(

return tcp.New(&config.Config, factory)
case unix.Name:
cfgwarn.Beta("Syslog Unix socket support is beta.")

config := defaultUnix
if err := cfg.Unpack(&config); err != nil {
return nil, err
Expand Down
2 changes: 2 additions & 0 deletions filebeat/input/unix/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/elastic/beats/v7/filebeat/inputsource/unix"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/logp"
)

Expand All @@ -56,6 +57,7 @@ func NewInput(
connector channel.Connector,
context input.Context,
) (input.Input, error) {
cfgwarn.Beta("Unix socket support is beta.")

out, err := connector.ConnectWith(cfg, beat.ClientConfig{
Processing: beat.ProcessingConfig{
Expand Down
2 changes: 2 additions & 0 deletions filebeat/inputsource/unix/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const Name = "unix"
// Config exposes the unix configuration.
type Config struct {
Path string `config:"path"`
Group *string `config:"group"`
Mode *string `config:"mode"`
Timeout time.Duration `config:"timeout" validate:"nonzero,positive"`
MaxMessageSize cfgtype.ByteSize `config:"max_message_size" validate:"nonzero,positive"`
MaxConnections int `config:"max_connections"`
Expand Down
84 changes: 84 additions & 0 deletions filebeat/inputsource/unix/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@ package unix
import (
"fmt"
"net"
"os"
"os/user"
"runtime"
"strconv"

"github.com/pkg/errors"
"golang.org/x/net/netutil"

"github.com/elastic/beats/v7/filebeat/inputsource/common"
"github.com/elastic/beats/v7/libbeat/logp"
)

// Server represent a unix server
Expand Down Expand Up @@ -55,13 +61,91 @@ func New(
}

func (s *Server) createServer() (net.Listener, error) {
if err := s.cleanupStaleSocket(); err != nil {
return nil, err
}

l, err := net.Listen("unix", s.config.Path)
if err != nil {
return nil, err
}

if err := s.setSocketOwnership(); err != nil {
return nil, err
}

if err := s.setSocketMode(); err != nil {
return nil, err
}

if s.config.MaxConnections > 0 {
return netutil.LimitListener(l, s.config.MaxConnections), nil
}
return l, nil
}

func (s *Server) cleanupStaleSocket() error {
path := s.config.Path
info, err := os.Lstat(path)
if err != nil {
// If the file does not exist, then the cleanup can be considered successful.
if os.IsNotExist(err) {
return nil
}
return errors.Wrapf(err, "cannot lstat unix socket file at location %s", path)
}

if runtime.GOOS != "windows" {
// see https://github.com/golang/go/issues/33357 for context on Windows socket file attributes bug
if info.Mode()&os.ModeSocket == 0 {
return fmt.Errorf("refusing to remove file at location %s, it is not a socket", path)
}
}

if err := os.Remove(path); err != nil {
return errors.Wrapf(err, "cannot remove existing unix socket file at location %s", path)
}

return nil
}

func (s *Server) setSocketOwnership() error {
if s.config.Group != nil {
if runtime.GOOS == "windows" {
logp.NewLogger("unix").Warn("windows does not support the 'group' configuration option, ignoring")
return nil
}
g, err := user.LookupGroup(*s.config.Group)
if err != nil {
return err
}
gid, err := strconv.Atoi(g.Gid)
if err != nil {
return err
}
return os.Chown(s.config.Path, -1, gid)
}
return nil
}

func (s *Server) setSocketMode() error {
if s.config.Mode != nil {
mode, err := parseFileMode(*s.config.Mode)
if err != nil {
return err
}
return os.Chmod(s.config.Path, mode)
}
return nil
}

func parseFileMode(mode string) (os.FileMode, error) {
parsed, err := strconv.ParseUint(mode, 8, 32)
if err != nil {
return 0, err
}
if parsed > 0777 {
return 0, errors.New("invalid file mode")
}
return os.FileMode(parsed), nil
}
90 changes: 90 additions & 0 deletions filebeat/inputsource/unix/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import (
"math/rand"
"net"
"os"
"os/user"
"path/filepath"
"runtime"
"strconv"
"strings"
"testing"
"time"
Expand All @@ -35,6 +38,7 @@ import (
"github.com/elastic/beats/v7/filebeat/inputsource"
netcommon "github.com/elastic/beats/v7/filebeat/inputsource/common"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/file"
"github.com/elastic/beats/v7/libbeat/logp"
)

Expand Down Expand Up @@ -207,6 +211,92 @@ func TestReceiveEventsAndMetadata(t *testing.T) {
}
}

func TestSocketOwnershipAndMode(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("changing socket ownership is only supported on non-windows")
return
}

groups, err := os.Getgroups()
require.NoError(t, err)

if len(groups) <= 1 {
t.Skip("no group that we can change to")
return
}

group, err := user.LookupGroupId(strconv.Itoa(groups[1]))
require.NoError(t, err)

path := filepath.Join(os.TempDir(), "test.sock")
cfg, _ := common.NewConfigFrom(map[string]interface{}{
"path": path,
"group": group.Name,
"mode": "0740",
})
config := defaultConfig
err = cfg.Unpack(&config)
require.NoError(t, err)

factory := netcommon.SplitHandlerFactory(netcommon.FamilyUnix, logp.NewLogger("test"), MetadataCallback, nil, netcommon.SplitFunc([]byte("\n")))
server, err := New(&config, factory)
require.NoError(t, err)
err = server.Start()
require.NoError(t, err)
defer server.Stop()

info, err := file.Lstat(path)
require.NoError(t, err)
require.NotEqual(t, 0, info.Mode()&os.ModeSocket)
require.Equal(t, os.FileMode(0740), info.Mode().Perm())
gid, err := info.GID()
require.NoError(t, err)
require.Equal(t, group.Gid, strconv.Itoa(gid))
}

func TestSocketCleanup(t *testing.T) {
path := filepath.Join(os.TempDir(), "test.sock")
mockStaleSocket, err := net.Listen("unix", path)
require.NoError(t, err)
defer mockStaleSocket.Close()

cfg, _ := common.NewConfigFrom(map[string]interface{}{
"path": path,
})
config := defaultConfig
require.NoError(t, cfg.Unpack(&config))
factory := netcommon.SplitHandlerFactory(netcommon.FamilyUnix, logp.NewLogger("test"), MetadataCallback, nil, netcommon.SplitFunc([]byte("\n")))
server, err := New(&config, factory)
require.NoError(t, err)
err = server.Start()
require.NoError(t, err)
server.Stop()
}

func TestSocketCleanupRefusal(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("skipping due to windows FileAttributes bug https://github.com/golang/go/issues/33357")
return
}
path := filepath.Join(os.TempDir(), "test.sock")
f, err := os.Create(path)
require.NoError(t, err)
require.NoError(t, f.Close())
defer os.Remove(path)

cfg, _ := common.NewConfigFrom(map[string]interface{}{
"path": path,
})
config := defaultConfig
require.NoError(t, cfg.Unpack(&config))
factory := netcommon.SplitHandlerFactory(netcommon.FamilyUnix, logp.NewLogger("test"), MetadataCallback, nil, netcommon.SplitFunc([]byte("\n")))
server, err := New(&config, factory)
require.NoError(t, err)
err = server.Start()
require.Error(t, err)
require.Contains(t, err.Error(), "refusing to remove file at location")
}

func TestReceiveNewEventsConcurrently(t *testing.T) {
workers := 4
eventsCount := 100
Expand Down
1 change: 1 addition & 0 deletions libbeat/common/seccomp/policy_linux_386.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func init() {
"access",
"brk",
"chmod",
"chown",
"clock_gettime",
"clone",
"close",
Expand Down
1 change: 1 addition & 0 deletions libbeat/common/seccomp/policy_linux_amd64.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func init() {
"bind",
"brk",
"chmod",
"chown",
"clock_gettime",
"clone",
"close",
Expand Down

0 comments on commit 0524005

Please sign in to comment.