Skip to content

Commit

Permalink
[Filebeat] Fix reference leak in TCP and Unix socket inputs (elastic#…
Browse files Browse the repository at this point in the history
…19459)

The tcp and unix input sources were leaking references causing a memory leak.
When an accepted connection ended inputsource/common.Closer was
supposed to delete the pointer that it held to the connection, but due to a code
error `delete` was being called on the wrong map.

Instead of modifying the common.Closer I replaced it with a cancellable context.Context which
is designed to propagate signals from parent to children and requires less code.
  • Loading branch information
andrewkroh authored and melchiormoulin committed Oct 14, 2020
1 parent 452db6d commit d7c3ccf
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 157 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix `netflow` module to support 7 bytepad for IPFIX template. {issue}18098[18098]
- Fix Cisco ASA dissect pattern for 313008 & 313009 messages. {pull}19149[19149]
- Fix date and timestamp formats for fortigate module {pull}19316[19316]
- Fix memory leak in tcp and unix input sources. {pull}19459[19459]

*Heartbeat*

Expand Down
137 changes: 0 additions & 137 deletions filebeat/inputsource/common/closeref.go

This file was deleted.

12 changes: 4 additions & 8 deletions filebeat/inputsource/common/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package common

import (
"bufio"
"context"
"net"

"github.com/pkg/errors"
Expand All @@ -31,15 +32,15 @@ import (
type HandlerFactory func(config ListenerConfig) ConnectionHandler

// ConnectionHandler interface provides mechanisms for handling of incoming connections
type ConnectionHandler func(CloseRef, net.Conn) error
type ConnectionHandler func(context.Context, net.Conn) error

// MetadataFunc defines callback executed when a line is read from the split handler.
type MetadataFunc func(net.Conn) inputsource.NetworkMetadata

// SplitHandlerFactory allows creation of a handler that has splitting capabilities.
func SplitHandlerFactory(family Family, logger *logp.Logger, metadataCallback MetadataFunc, callback inputsource.NetworkFunc, splitFunc bufio.SplitFunc) HandlerFactory {
return func(config ListenerConfig) ConnectionHandler {
return ConnectionHandler(func(closer CloseRef, conn net.Conn) error {
return ConnectionHandler(func(ctx context.Context, conn net.Conn) error {
metadata := metadataCallback(conn)
maxMessageSize := uint64(config.MaxMessageSize)

Expand All @@ -60,16 +61,11 @@ func SplitHandlerFactory(family Family, logger *logp.Logger, metadataCallback Me
scanner.Buffer(buffer, int(maxMessageSize))
for {
select {
case <-closer.Done():
case <-ctx.Done():
break
default:
}

// Ensure that if the Conn is already closed then dont attempt to scan again
if closer.Err() == ErrClosed {
break
}

if !scanner.Scan() {
break
}
Expand Down
29 changes: 17 additions & 12 deletions filebeat/inputsource/common/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ package common
import (
"bufio"
"bytes"
"context"
"net"
"strings"
"sync"

"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/go-concert/ctxtool"
)

// Family represents the type of connection we're handling
Expand All @@ -51,9 +53,9 @@ type Listener struct {
config *ListenerConfig
family Family
wg sync.WaitGroup
done chan struct{}
log *logp.Logger
closer *Closer
ctx context.Context
cancel context.CancelFunc
clientsCount atomic.Int
handlerFactory HandlerFactory
listenerFactory ListenerFactory
Expand All @@ -63,10 +65,8 @@ type Listener struct {
func NewListener(family Family, location string, handlerFactory HandlerFactory, listenerFactory ListenerFactory, config *ListenerConfig) *Listener {
return &Listener{
config: config,
done: make(chan struct{}),
family: family,
log: logp.NewLogger(string(family)).With("address", location),
closer: NewCloser(nil),
handlerFactory: handlerFactory,
listenerFactory: listenerFactory,
}
Expand All @@ -80,7 +80,12 @@ func (l *Listener) Start() error {
return err
}

l.closer.SetCallback(func() { l.Listener.Close() })
l.ctx, l.cancel = context.WithCancel(context.Background())
go func() {
<-l.ctx.Done()
l.Listener.Close()
}()

l.log.Info("Started listening for " + l.family.String() + " connection")

l.wg.Add(1)
Expand All @@ -101,22 +106,21 @@ func (l *Listener) run() {
conn, err := l.Listener.Accept()
if err != nil {
select {
case <-l.closer.Done():
case <-l.ctx.Done():
return
default:
l.log.Debugw("Can not accept the connection", "error", err)
continue
}
}

handler := l.handlerFactory(*l.config)
closer := WithCloser(l.closer, func() { conn.Close() })

l.wg.Add(1)
go func() {
defer logp.Recover("recovering from a " + l.family.String() + " client crash")
defer l.wg.Done()
defer closer.Close()

ctx, cancel := ctxtool.WithFunc(l.ctx, func() { conn.Close() })
defer cancel()

l.registerHandler()
defer l.unregisterHandler()
Expand All @@ -128,7 +132,8 @@ func (l *Listener) run() {
l.log.Debugw("New client", "remote_address", conn.RemoteAddr(), "total", l.clientsCount.Load())
}

err := handler(closer, conn)
handler := l.handlerFactory(*l.config)
err := handler(ctx, conn)
if err != nil {
l.log.Debugw("client error", "error", err)
}
Expand All @@ -148,7 +153,7 @@ func (l *Listener) run() {
// Stop stops accepting new incoming connections and Close any active clients
func (l *Listener) Stop() {
l.log.Info("Stopping" + l.family.String() + "server")
l.closer.Close()
l.cancel()
l.wg.Wait()
l.log.Info(l.family.String() + " server stopped")
}
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,7 @@ go.opencensus.io v0.22.2 h1:75k/FF0Q2YM8QYo07VPddOLBslDt1MZOdEslOHvmzAs=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.uber.org/atomic v1.5.0 h1:OI5t8sDa1Or+q8AeE+yKeB/SDYioSHAgcVljj9JIETY=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/goleak v1.0.0 h1:qsup4IcBdlmsnGfqyLl4Ntn3C2XCCuKAE7DwHpScyUo=
go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.3.0 h1:sFPn2GLc3poCkfrpIXGhBD2X0CMIo4Q/zSULXrj/+uc=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
Expand Down

0 comments on commit d7c3ccf

Please sign in to comment.