From a56015cf767033b157e8371a6b32f3b32e54a4a2 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Thu, 8 Oct 2020 18:45:34 +0200 Subject: [PATCH] Update system/socket dataset to support config reloading --- CHANGELOG.next.asciidoc | 1 + .../auditbeat/module/system/socket/config.go | 23 +++++++++++-- .../module/system/socket/guess/creds.go | 2 +- .../module/system/socket/guess/cskxmit6.go | 2 +- .../module/system/socket/guess/deref.go | 2 +- .../module/system/socket/guess/inetsock.go | 2 +- .../module/system/socket/guess/inetsock6.go | 2 +- .../module/system/socket/guess/inetsockaf.go | 2 +- .../module/system/socket/guess/iplocalout.go | 2 +- .../module/system/socket/guess/registry.go | 24 ++++++++------ .../module/system/socket/guess/skbuff.go | 6 ++-- .../module/system/socket/guess/sockaddrin.go | 3 +- .../module/system/socket/guess/sockaddrin6.go | 3 +- .../module/system/socket/guess/socketsk.go | 2 +- .../module/system/socket/guess/syscallargs.go | 6 ++-- .../system/socket/guess/tcpsendmsgargs.go | 2 +- .../system/socket/guess/tcpsendmsgsk.go | 2 +- .../module/system/socket/guess/udpsendmsg.go | 2 +- .../module/system/socket/socket_linux.go | 32 +++++++++++++++---- 19 files changed, 82 insertions(+), 38 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b77935ef89b..f0896b93c3f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -232,6 +232,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - system/socket: Fix kprobe grouping to allow running more than one instance. {pull}20325[20325] - system/socket: Fixed a crash due to concurrent map read and write. {issue}21192[21192] {pull}21690[21690] - file_integrity: stop monitoring excluded paths {issue}21278[21278] {pull}21282[21282] +- system/socket: Fixed start failure when run under config reloader. {issue}20851[20851] {pull}21693[21693] *Filebeat* diff --git a/x-pack/auditbeat/module/system/socket/config.go b/x-pack/auditbeat/module/system/socket/config.go index 55ac5b4f907..79413f7649c 100644 --- a/x-pack/auditbeat/module/system/socket/config.go +++ b/x-pack/auditbeat/module/system/socket/config.go @@ -4,7 +4,10 @@ package socket -import "time" +import ( + "reflect" + "time" +) // Config defines this metricset's configuration options. type Config struct { @@ -64,11 +67,27 @@ type Config struct { EnableIPv6 *bool `config:"socket.enable_ipv6"` } -// Validate validates the host metricset config. +// Validate validates the socket metricset config. func (c *Config) Validate() error { return nil } +// Equals compares two Config objects +func (c *Config) Equals(other Config) bool { + // reflect.DeepEquals() doesn't compare pointed-to values, so strip + // all pointers and then compare them manually. + simpler := [2]Config{*c, other} + for idx := range simpler { + simpler[idx].EnableIPv6 = nil + simpler[idx].TraceFSPath = nil + } + return reflect.DeepEqual(simpler[0], simpler[1]) && + (c.EnableIPv6 == nil) == (other.EnableIPv6 == nil) && + (c.EnableIPv6 == nil || *c.EnableIPv6 == *other.EnableIPv6) && + (c.TraceFSPath == nil) == (other.TraceFSPath == nil) && + (c.TraceFSPath == nil || *c.TraceFSPath == *other.TraceFSPath) +} + var defaultConfig = Config{ PerfQueueSize: 4096, LostQueueSize: 128, diff --git a/x-pack/auditbeat/module/system/socket/guess/creds.go b/x-pack/auditbeat/module/system/socket/guess/creds.go index ad35ed06e93..7e3bd4791d9 100644 --- a/x-pack/auditbeat/module/system/socket/guess/creds.go +++ b/x-pack/auditbeat/module/system/socket/guess/creds.go @@ -51,7 +51,7 @@ const ( ) func init() { - if err := Registry.AddGuess(&guessStructCreds{}); err != nil { + if err := Registry.AddGuess(func() Guesser { return &guessStructCreds{} }); err != nil { panic(err) } } diff --git a/x-pack/auditbeat/module/system/socket/guess/cskxmit6.go b/x-pack/auditbeat/module/system/socket/guess/cskxmit6.go index b17efafabaa..aae0e2fd9b6 100644 --- a/x-pack/auditbeat/module/system/socket/guess/cskxmit6.go +++ b/x-pack/auditbeat/module/system/socket/guess/cskxmit6.go @@ -34,7 +34,7 @@ import ( */ func init() { - if err := Registry.AddGuess(&guessInet6CskXmit{}); err != nil { + if err := Registry.AddGuess(func() Guesser { return &guessInet6CskXmit{} }); err != nil { panic(err) } } diff --git a/x-pack/auditbeat/module/system/socket/guess/deref.go b/x-pack/auditbeat/module/system/socket/guess/deref.go index a019b7cf7b9..648580e5cc0 100644 --- a/x-pack/auditbeat/module/system/socket/guess/deref.go +++ b/x-pack/auditbeat/module/system/socket/guess/deref.go @@ -28,7 +28,7 @@ import ( */ func init() { - if err := Registry.AddGuess(&guessDeref{}); err != nil { + if err := Registry.AddGuess(func() Guesser { return &guessDeref{} }); err != nil { panic(err) } } diff --git a/x-pack/auditbeat/module/system/socket/guess/inetsock.go b/x-pack/auditbeat/module/system/socket/guess/inetsock.go index cc08fa79bf4..57a133d0056 100644 --- a/x-pack/auditbeat/module/system/socket/guess/inetsock.go +++ b/x-pack/auditbeat/module/system/socket/guess/inetsock.go @@ -35,7 +35,7 @@ import ( // matched the remote address. This is used by guess_inet_sock6. func init() { - if err := Registry.AddGuess(&guessInetSockIPv4{}); err != nil { + if err := Registry.AddGuess(func() Guesser { return &guessInetSockIPv4{} }); err != nil { panic(err) } } diff --git a/x-pack/auditbeat/module/system/socket/guess/inetsock6.go b/x-pack/auditbeat/module/system/socket/guess/inetsock6.go index c76b47e3d19..37cd56b06fb 100644 --- a/x-pack/auditbeat/module/system/socket/guess/inetsock6.go +++ b/x-pack/auditbeat/module/system/socket/guess/inetsock6.go @@ -103,7 +103,7 @@ import ( const inetSockDumpSize = 8 * 256 func init() { - if err := Registry.AddGuess(&guessInetSockIPv6{}); err != nil { + if err := Registry.AddGuess(func() Guesser { return &guessInetSockIPv6{} }); err != nil { panic(err) } } diff --git a/x-pack/auditbeat/module/system/socket/guess/inetsockaf.go b/x-pack/auditbeat/module/system/socket/guess/inetsockaf.go index ec8075060d4..2e66eb2a724 100644 --- a/x-pack/auditbeat/module/system/socket/guess/inetsockaf.go +++ b/x-pack/auditbeat/module/system/socket/guess/inetsockaf.go @@ -45,7 +45,7 @@ import ( const inetSockAfDumpSize = 8 * 16 func init() { - if err := Registry.AddGuess(&guessInetSockFamily{}); err != nil { + if err := Registry.AddGuess(func() Guesser { return &guessInetSockFamily{} }); err != nil { panic(err) } } diff --git a/x-pack/auditbeat/module/system/socket/guess/iplocalout.go b/x-pack/auditbeat/module/system/socket/guess/iplocalout.go index 170d2bf6885..d78140bda13 100644 --- a/x-pack/auditbeat/module/system/socket/guess/iplocalout.go +++ b/x-pack/auditbeat/module/system/socket/guess/iplocalout.go @@ -39,7 +39,7 @@ const ( ) func init() { - if err := Registry.AddGuess(&guessIPLocalOut{}); err != nil { + if err := Registry.AddGuess(func() Guesser { return &guessIPLocalOut{} }); err != nil { panic(err) } } diff --git a/x-pack/auditbeat/module/system/socket/guess/registry.go b/x-pack/auditbeat/module/system/socket/guess/registry.go index 16c583cbb9e..66971a1f1d5 100644 --- a/x-pack/auditbeat/module/system/socket/guess/registry.go +++ b/x-pack/auditbeat/module/system/socket/guess/registry.go @@ -8,29 +8,33 @@ package guess import "fmt" -// Registry serves as a registration point for guesses. -var Registry = Register{ - guesses: make(map[string]Guesser), -} +// GuesserFactory is a factory function for guesses. +type GuesserFactory func() Guesser // Register stores the registered guesses. type Register struct { - guesses map[string]Guesser + factories map[string]GuesserFactory +} + +// Registry serves as a registration point for guesses. +var Registry = Register{ + factories: make(map[string]GuesserFactory), } // AddGuess registers a new guess. -func (r *Register) AddGuess(guess Guesser) error { - if _, found := r.guesses[guess.Name()]; found { +func (r *Register) AddGuess(factory GuesserFactory) error { + guess := factory() + if _, found := r.factories[guess.Name()]; found { return fmt.Errorf("guess %s is duplicated", guess.Name()) } - r.guesses[guess.Name()] = guess + r.factories[guess.Name()] = factory return nil } // GetList returns a list of registered guesses. func (r *Register) GetList() (list []Guesser) { - for _, guess := range r.guesses { - list = append(list, guess) + for _, factory := range r.factories { + list = append(list, factory()) } return list } diff --git a/x-pack/auditbeat/module/system/socket/guess/skbuff.go b/x-pack/auditbeat/module/system/socket/guess/skbuff.go index 78a73de1187..9960c036465 100644 --- a/x-pack/auditbeat/module/system/socket/guess/skbuff.go +++ b/x-pack/auditbeat/module/system/socket/guess/skbuff.go @@ -45,13 +45,13 @@ import ( const maxSafePayload = 508 func init() { - if err := Registry.AddGuess(&guessSkBuffLen{}); err != nil { + if err := Registry.AddGuess(func() Guesser { return &guessSkBuffLen{} }); err != nil { panic(err) } - if err := Registry.AddGuess(&guessSkBuffProto{}); err != nil { + if err := Registry.AddGuess(func() Guesser { return &guessSkBuffProto{} }); err != nil { panic(err) } - if err := Registry.AddGuess(&guessSkBuffDataPtr{}); err != nil { + if err := Registry.AddGuess(func() Guesser { return &guessSkBuffDataPtr{} }); err != nil { panic(err) } } diff --git a/x-pack/auditbeat/module/system/socket/guess/sockaddrin.go b/x-pack/auditbeat/module/system/socket/guess/sockaddrin.go index c9df8356472..356442b1d86 100644 --- a/x-pack/auditbeat/module/system/socket/guess/sockaddrin.go +++ b/x-pack/auditbeat/module/system/socket/guess/sockaddrin.go @@ -29,8 +29,7 @@ import ( */ func init() { - if err := Registry.AddGuess( - &guessSockaddrIn{}); err != nil { + if err := Registry.AddGuess(func() Guesser { return &guessSockaddrIn{} }); err != nil { panic(err) } } diff --git a/x-pack/auditbeat/module/system/socket/guess/sockaddrin6.go b/x-pack/auditbeat/module/system/socket/guess/sockaddrin6.go index ffed4c577e3..8d1d0b15b25 100644 --- a/x-pack/auditbeat/module/system/socket/guess/sockaddrin6.go +++ b/x-pack/auditbeat/module/system/socket/guess/sockaddrin6.go @@ -29,8 +29,7 @@ import ( */ func init() { - if err := Registry.AddGuess( - &guessSockaddrIn6{}); err != nil { + if err := Registry.AddGuess(func() Guesser { return &guessSockaddrIn6{} }); err != nil { panic(err) } } diff --git a/x-pack/auditbeat/module/system/socket/guess/socketsk.go b/x-pack/auditbeat/module/system/socket/guess/socketsk.go index e084c82f1a4..072eba6b66b 100644 --- a/x-pack/auditbeat/module/system/socket/guess/socketsk.go +++ b/x-pack/auditbeat/module/system/socket/guess/socketsk.go @@ -29,7 +29,7 @@ import ( // "SOCKET_SOCK": 32 func init() { - if err := Registry.AddGuess(&guessSocketSock{}); err != nil { + if err := Registry.AddGuess(func() Guesser { return &guessSocketSock{} }); err != nil { panic(err) } } diff --git a/x-pack/auditbeat/module/system/socket/guess/syscallargs.go b/x-pack/auditbeat/module/system/socket/guess/syscallargs.go index 563ec9e1355..44796b73fa9 100644 --- a/x-pack/auditbeat/module/system/socket/guess/syscallargs.go +++ b/x-pack/auditbeat/module/system/socket/guess/syscallargs.go @@ -31,8 +31,10 @@ import ( */ func init() { - if err := Registry.AddGuess(&guessSyscallArgs{ - expected: [2]uintptr{^uintptr(0x11111111), ^uintptr(0x22222222)}, + if err := Registry.AddGuess(func() Guesser { + return &guessSyscallArgs{ + expected: [2]uintptr{^uintptr(0x11111111), ^uintptr(0x22222222)}, + } }); err != nil { panic(err) } diff --git a/x-pack/auditbeat/module/system/socket/guess/tcpsendmsgargs.go b/x-pack/auditbeat/module/system/socket/guess/tcpsendmsgargs.go index f0382ad3ed2..60bc66f7f91 100644 --- a/x-pack/auditbeat/module/system/socket/guess/tcpsendmsgargs.go +++ b/x-pack/auditbeat/module/system/socket/guess/tcpsendmsgargs.go @@ -24,7 +24,7 @@ import ( // TCP_SENDMSG_LEN : +4(%sp) func init() { - if err := Registry.AddGuess(&guessTCPSendMsg{}); err != nil { + if err := Registry.AddGuess(func() Guesser { return &guessTCPSendMsg{} }); err != nil { panic(err) } } diff --git a/x-pack/auditbeat/module/system/socket/guess/tcpsendmsgsk.go b/x-pack/auditbeat/module/system/socket/guess/tcpsendmsgsk.go index 0e3ede37b54..09004388ada 100644 --- a/x-pack/auditbeat/module/system/socket/guess/tcpsendmsgsk.go +++ b/x-pack/auditbeat/module/system/socket/guess/tcpsendmsgsk.go @@ -28,7 +28,7 @@ import ( // TCP_SENDMSG_SOCK : %di func init() { - if err := Registry.AddGuess(&guessTcpSendmsgSock{}); err != nil { + if err := Registry.AddGuess(func() Guesser { return &guessTcpSendmsgSock{} }); err != nil { panic(err) } } diff --git a/x-pack/auditbeat/module/system/socket/guess/udpsendmsg.go b/x-pack/auditbeat/module/system/socket/guess/udpsendmsg.go index f381b5ba95e..de5889d9312 100644 --- a/x-pack/auditbeat/module/system/socket/guess/udpsendmsg.go +++ b/x-pack/auditbeat/module/system/socket/guess/udpsendmsg.go @@ -28,7 +28,7 @@ import ( // UDP_SENDMSG_MSG: $stack3 func init() { - if err := Registry.AddGuess(&guessUDPSendMsg{}); err != nil { + if err := Registry.AddGuess(func() Guesser { return &guessUDPSendMsg{} }); err != nil { panic(err) } } diff --git a/x-pack/auditbeat/module/system/socket/socket_linux.go b/x-pack/auditbeat/module/system/socket/socket_linux.go index 11f8a22289e..00879ce6152 100644 --- a/x-pack/auditbeat/module/system/socket/socket_linux.go +++ b/x-pack/auditbeat/module/system/socket/socket_linux.go @@ -14,6 +14,7 @@ import ( "sort" "strconv" "strings" + "sync" "sync/atomic" "syscall" "time" @@ -73,6 +74,7 @@ type MetricSet struct { mountedFS *mountPoint isDebug bool isDetailed bool + terminated sync.WaitGroup } func init() { @@ -86,20 +88,38 @@ func init() { } } +var currentInstance *MetricSet + // New constructs a new MetricSet. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - cfgwarn.Beta("The %s dataset is beta.", fullName) - config := defaultConfig if err := base.Module().UnpackConfig(&config); err != nil { return nil, errors.Wrapf(err, "failed to unpack the %s config", fullName) } + if currentInstance != nil { + // Do not instantiate a new dataset if the config hasn't changed. + // This is necessary when run under config reloader even though the + // reloader itself already checks the config for changes, because + // the first time it runs it will allocate two consecutive instances + // (one for checking the config, one for running). This saves + // running the guesses twice on startup. + if config.Equals(currentInstance.config) { + return currentInstance, nil + } + currentInstance.terminated.Wait() + } + var err error + currentInstance, err = newSocketMetricset(config, base) + return currentInstance, err +} + +func newSocketMetricset(config Config, base mb.BaseMetricSet) (*MetricSet, error) { + cfgwarn.Beta("The %s dataset is beta.", fullName) logger := logp.NewLogger(metricsetName) sniffer, err := dns.NewSniffer(base, logger) if err != nil { return nil, errors.Wrap(err, "unable to create DNS sniffer") } - ms := &MetricSet{ SystemMetricSet: system.NewSystemMetricSet(base), templateVars: make(common.MapStr), @@ -110,10 +130,9 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { isDetailed: logp.HasSelector(detailSelector), sniffer: sniffer, } - // Setup the metricset before Run() so that startup can be halted in case of // error. - if err := ms.Setup(); err != nil { + if err = ms.Setup(); err != nil { return nil, errors.Wrapf(err, "%s dataset setup failed", fullName) } return ms, nil @@ -121,7 +140,9 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Run the metricset. This will loop until the passed reporter is cancelled. func (m *MetricSet) Run(r mb.PushReporterV2) { + m.terminated.Add(1) defer m.log.Infof("%s terminated.", fullName) + defer m.terminated.Done() defer m.Cleanup() st := NewState(r, @@ -235,7 +256,6 @@ func (m *MetricSet) Setup() (err error) { // var traceFS *tracing.TraceFS if m.config.TraceFSPath == nil { - if err := tracing.IsTraceFSAvailable(); err != nil { m.log.Debugf("tracefs/debugfs not found. Attempting to mount") for _, mount := range defaultMounts {