Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] Fix reference leak in TCP and Unix socket inputs #19459

Merged
merged 4 commits into from
Jun 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix `netflow` module to support 7 bytepad for IPFIX template. {issue}18098[18098]
- Fix Cisco ASA dissect pattern for 313008 & 313009 messages. {pull}19149[19149]
- Fix date and timestamp formats for fortigate module {pull}19316[19316]
- Fix memory leak in tcp and unix input sources. {pull}19459[19459]

*Heartbeat*

Expand Down
137 changes: 0 additions & 137 deletions filebeat/inputsource/common/closeref.go

This file was deleted.

12 changes: 4 additions & 8 deletions filebeat/inputsource/common/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package common

import (
"bufio"
"context"
"net"

"github.com/pkg/errors"
Expand All @@ -31,15 +32,15 @@ import (
type HandlerFactory func(config ListenerConfig) ConnectionHandler

// ConnectionHandler interface provides mechanisms for handling of incoming connections
type ConnectionHandler func(CloseRef, net.Conn) error
type ConnectionHandler func(context.Context, net.Conn) error

// MetadataFunc defines callback executed when a line is read from the split handler.
type MetadataFunc func(net.Conn) inputsource.NetworkMetadata

// SplitHandlerFactory allows creation of a handler that has splitting capabilities.
func SplitHandlerFactory(family Family, logger *logp.Logger, metadataCallback MetadataFunc, callback inputsource.NetworkFunc, splitFunc bufio.SplitFunc) HandlerFactory {
return func(config ListenerConfig) ConnectionHandler {
return ConnectionHandler(func(closer CloseRef, conn net.Conn) error {
return ConnectionHandler(func(ctx context.Context, conn net.Conn) error {
metadata := metadataCallback(conn)
maxMessageSize := uint64(config.MaxMessageSize)

Expand All @@ -60,16 +61,11 @@ func SplitHandlerFactory(family Family, logger *logp.Logger, metadataCallback Me
scanner.Buffer(buffer, int(maxMessageSize))
for {
select {
case <-closer.Done():
case <-ctx.Done():
break
default:
}

// Ensure that if the Conn is already closed then dont attempt to scan again
if closer.Err() == ErrClosed {
break
}

if !scanner.Scan() {
break
}
Expand Down
29 changes: 17 additions & 12 deletions filebeat/inputsource/common/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ package common
import (
"bufio"
"bytes"
"context"
"net"
"strings"
"sync"

"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/go-concert/ctxtool"
)

// Family represents the type of connection we're handling
Expand All @@ -51,9 +53,9 @@ type Listener struct {
config *ListenerConfig
family Family
wg sync.WaitGroup
done chan struct{}
log *logp.Logger
closer *Closer
ctx context.Context
cancel context.CancelFunc
clientsCount atomic.Int
handlerFactory HandlerFactory
listenerFactory ListenerFactory
Expand All @@ -63,10 +65,8 @@ type Listener struct {
func NewListener(family Family, location string, handlerFactory HandlerFactory, listenerFactory ListenerFactory, config *ListenerConfig) *Listener {
return &Listener{
config: config,
done: make(chan struct{}),
family: family,
log: logp.NewLogger(string(family)).With("address", location),
closer: NewCloser(nil),
handlerFactory: handlerFactory,
listenerFactory: listenerFactory,
}
Expand All @@ -80,7 +80,12 @@ func (l *Listener) Start() error {
return err
}

l.closer.SetCallback(func() { l.Listener.Close() })
l.ctx, l.cancel = context.WithCancel(context.Background())
go func() {
<-l.ctx.Done()
l.Listener.Close()
}()

l.log.Info("Started listening for " + l.family.String() + " connection")

l.wg.Add(1)
Expand All @@ -101,22 +106,21 @@ func (l *Listener) run() {
conn, err := l.Listener.Accept()
if err != nil {
select {
case <-l.closer.Done():
case <-l.ctx.Done():
return
default:
l.log.Debugw("Can not accept the connection", "error", err)
continue
}
}

handler := l.handlerFactory(*l.config)
closer := WithCloser(l.closer, func() { conn.Close() })

l.wg.Add(1)
go func() {
defer logp.Recover("recovering from a " + l.family.String() + " client crash")
defer l.wg.Done()
defer closer.Close()

ctx, cancel := ctxtool.WithFunc(l.ctx, func() { conn.Close() })
defer cancel()

l.registerHandler()
defer l.unregisterHandler()
Expand All @@ -128,7 +132,8 @@ func (l *Listener) run() {
l.log.Debugw("New client", "remote_address", conn.RemoteAddr(), "total", l.clientsCount.Load())
}

err := handler(closer, conn)
handler := l.handlerFactory(*l.config)
err := handler(ctx, conn)
if err != nil {
l.log.Debugw("client error", "error", err)
}
Expand All @@ -148,7 +153,7 @@ func (l *Listener) run() {
// Stop stops accepting new incoming connections and Close any active clients
func (l *Listener) Stop() {
l.log.Info("Stopping" + l.family.String() + "server")
l.closer.Close()
l.cancel()
l.wg.Wait()
l.log.Info(l.family.String() + " server stopped")
}
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,7 @@ go.opencensus.io v0.22.2 h1:75k/FF0Q2YM8QYo07VPddOLBslDt1MZOdEslOHvmzAs=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.uber.org/atomic v1.5.0 h1:OI5t8sDa1Or+q8AeE+yKeB/SDYioSHAgcVljj9JIETY=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/goleak v1.0.0 h1:qsup4IcBdlmsnGfqyLl4Ntn3C2XCCuKAE7DwHpScyUo=
go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.3.0 h1:sFPn2GLc3poCkfrpIXGhBD2X0CMIo4Q/zSULXrj/+uc=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
Expand Down