Skip to content

Commit

Permalink
[Auditbeat] Memory leak and syscall fix (#17500)
Browse files Browse the repository at this point in the history
* Add code for reaping sockets after a set timeout

* Fix broken fileset under 32-bit OS.

This unrelated bug was introduced by #15890 (unreleased).

* Add docs

* Update changelog

* Fix ordering issue

Co-authored-by: Adrian Serrano <[email protected]>
  • Loading branch information
Andrew Stucki and adriansr authored Apr 6, 2020
1 parent d74bdeb commit ff68af6
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 36 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Use max in k8s overview dashboard aggregations. {pull}17015[17015]
- Fix Disk Used and Disk Usage visualizations in the Metricbeat System dashboards. {issue}12435[12435] {pull}17272[17272]
- Fix missing Accept header for Prometheus and OpenMetrics module. {issue}16870[16870] {pull}17291[17291]
- Further revise check for bad data in docker/memory. {pull}17400[17400]
- Further revise check for bad data in docker/memory. {pull}17400[17400]
- Fix issue in Jolokia module when mbean contains multiple quoted properties. {issue}17375[17375] {pull}17374[17374]
- Combine cloudwatch aggregated metrics into single event. {pull}17345[17345]
- Fix how we filter services by name in system/service {pull}17400[17400]
Expand Down Expand Up @@ -184,6 +184,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

- Reference kubernetes manifests mount data directory from the host, so data persist between executions in the same node. {pull}17429[17429]
- Log to stderr when running using reference kubernetes manifests. {pull}17443[174443]
- Fix syscall kprobe arguments for 32-bit systems in socket module. {pull}17500[17500]
- Fix memory leak on when we miss socket close kprobe events. {pull}17500[17500]

*Filebeat*

Expand Down
6 changes: 6 additions & 0 deletions x-pack/auditbeat/module/system/socket/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ Determines how long to wait after a socket has been closed for out of order
packets. With TCP, some packets can be received shortly after a socket is
closed. If set too low, additional flows will be generated for those packets.

- `socket.socket_inactive_timeout` (default: 1m)

How long a socket can be inactive to be evicted from the internal cache.
A lower value reduces memory usage at the expense of some flows being
reported as multiple partial flows.

- `socket.perf_queue_size` (default: 4096)

The number of tracing samples that can be queued for processing. A larger value
Expand Down
12 changes: 6 additions & 6 deletions x-pack/auditbeat/module/system/socket/arch_386.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ var archVariables = common.MapStr{
"RET": "%ax",

// System call parameters
"SYS_P1": "$stack1",
"SYS_P2": "$stack2",
"SYS_P3": "$stack3",
"SYS_P4": "$stack4",
"SYS_P5": "$stack5",
"SYS_P6": "$stack6",
"_SYS_P1": "$stack1",
"_SYS_P2": "$stack2",
"_SYS_P3": "$stack3",
"_SYS_P4": "$stack4",
"_SYS_P5": "$stack5",
"_SYS_P6": "$stack6",
}
5 changes: 5 additions & 0 deletions x-pack/auditbeat/module/system/socket/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ type Config struct {
// considered closed.
FlowInactiveTimeout time.Duration `config:"socket.flow_inactive_timeout"`

// SocketInactiveTimeout determines how long a socket has to be inactive to be
// considered terminated or closed.
SocketInactiveTimeout time.Duration `config:"socket.socket_inactive_timeout"`

// FlowTerminationTimeout determines how long to wait after a flow has been
// closed for out of order packets. With TCP, some packets can be received
// shortly after a socket is closed. If set too low, additional flows will
Expand Down Expand Up @@ -71,6 +75,7 @@ var defaultConfig = Config{
ErrQueueSize: 1,
RingSizeExp: 7,
FlowInactiveTimeout: 30 * time.Second,
SocketInactiveTimeout: 60 * time.Second,
FlowTerminationTimeout: 5 * time.Second,
ClockMaxDrift: 100 * time.Millisecond,
ClockSyncPeriod: 10 * time.Second,
Expand Down
1 change: 1 addition & 0 deletions x-pack/auditbeat/module/system/socket/socket_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func (m *MetricSet) Run(r mb.PushReporterV2) {
st := NewState(r,
m.log,
m.config.FlowInactiveTimeout,
m.config.SocketInactiveTimeout,
m.config.FlowTerminationTimeout,
m.config.ClockMaxDrift)

Expand Down
76 changes: 50 additions & 26 deletions x-pack/auditbeat/module/system/socket/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ type socket struct {
closing bool
closeTime time.Time
prev, next linkedElement

createdTime, lastSeenTime time.Time
}

// Prev returns the previous socket in the linked list.
Expand Down Expand Up @@ -353,11 +355,14 @@ type state struct {
numFlows uint64

// configuration
inactiveTimeout, closeTimeout time.Duration
clockMaxDrift time.Duration
inactiveTimeout, closeTimeout, socketTimeout time.Duration
clockMaxDrift time.Duration

// lru used for flow expiration.
lru linkedList
flowLRU linkedList

// lru used for socket expiration.
socketLRU linkedList

// holds closed and expired flows.
done linkedList
Expand All @@ -373,10 +378,14 @@ func (s *state) getSocket(sock uintptr) *socket {
if socket, found := s.socks[sock]; found {
return socket
}
now := time.Now()
socket := &socket{
sock: sock,
sock: sock,
createdTime: now,
lastSeenTime: now,
}
s.socks[sock] = socket
s.socketLRU.add(socket)
return socket
}

Expand All @@ -385,20 +394,21 @@ var kernelProcess = process{
name: "[kernel_task]",
}

func NewState(r mb.PushReporterV2, log helper.Logger, inactiveTimeout, closeTimeout, clockMaxDrift time.Duration) *state {
s := makeState(r, log, inactiveTimeout, closeTimeout, clockMaxDrift)
func NewState(r mb.PushReporterV2, log helper.Logger, inactiveTimeout, socketTimeout, closeTimeout, clockMaxDrift time.Duration) *state {
s := makeState(r, log, inactiveTimeout, socketTimeout, closeTimeout, clockMaxDrift)
go s.reapLoop()
return s
}

func makeState(r mb.PushReporterV2, log helper.Logger, inactiveTimeout, closeTimeout, clockMaxDrift time.Duration) *state {
func makeState(r mb.PushReporterV2, log helper.Logger, inactiveTimeout, socketTimeout, closeTimeout, clockMaxDrift time.Duration) *state {
return &state{
reporter: r,
log: log,
processes: make(map[uint32]*process),
socks: make(map[uintptr]*socket),
threads: make(map[uint32]event),
inactiveTimeout: inactiveTimeout,
socketTimeout: socketTimeout,
closeTimeout: closeTimeout,
clockMaxDrift: clockMaxDrift,
dns: newDNSTracker(inactiveTimeout * 2),
Expand All @@ -422,7 +432,7 @@ func (s *state) logState() {
numSocks := len(s.socks)
numProcs := len(s.processes)
numThreads := len(s.threads)
lruSize := s.lru.size
flowLRUSize := s.flowLRU.size
doneSize := s.done.size
closingSize := s.closing.size
events := atomic.LoadUint64(&eventCount)
Expand All @@ -434,11 +444,11 @@ func (s *state) logState() {
lastEvents = events
lastTime = now
var errs []string
if uint64(lruSize) != numFlows {
if uint64(flowLRUSize) != numFlows {
errs = append(errs, "flow count mismatch")
}
msg := fmt.Sprintf("state flows=%d sockets=%d procs=%d threads=%d lru=%d done=%d closing=%d events=%d eps=%.1f",
numFlows, numSocks, numProcs, numThreads, lruSize, doneSize, closingSize, events,
numFlows, numSocks, numProcs, numThreads, flowLRUSize, doneSize, closingSize, events,
float64(newEvs)*float64(time.Second)/float64(took))
if errs == nil {
s.log.Debugf("%s", msg)
Expand Down Expand Up @@ -489,13 +499,23 @@ func (s *state) ExpireOlder() {
s.Lock()
defer s.Unlock()
deadline := time.Now().Add(-s.inactiveTimeout)
for item := s.lru.peek(); item != nil && item.Timestamp().Before(deadline); {
for item := s.flowLRU.peek(); item != nil && item.Timestamp().Before(deadline); {
if flow, ok := item.(*flow); ok {
s.onFlowTerminated(flow)
} else {
s.lru.get()
s.flowLRU.get()
}
item = s.lru.peek()
item = s.flowLRU.peek()
}

deadline = time.Now().Add(-s.socketTimeout)
for item := s.socketLRU.peek(); item != nil && item.Timestamp().Before(deadline); {
if sock, ok := item.(*socket); ok {
s.onSockDestroyed(sock.sock, 0)
} else {
s.socketLRU.get()
}
item = s.socketLRU.peek()
}

deadline = time.Now().Add(-s.closeTimeout)
Expand Down Expand Up @@ -638,19 +658,16 @@ func (s *state) mutualEnrich(sock *socket, f *flow) {
sock.process = s.getProcess(sock.pid)
f.process = sock.process
}
if !sock.closing {
sock.lastSeenTime = time.Now()
s.socketLRU.remove(sock)
s.socketLRU.add(sock)
}
}

func (s *state) createFlow(ref flow) error {
// Get or create a socket for this flow
sock, found := s.socks[ref.sock]
if !found {
sock = &socket{
sock: ref.sock,
}
s.socks[ref.sock] = sock

}

sock := s.getSocket(ref.sock)
ref.createdTime = ref.lastSeenTime
s.mutualEnrich(sock, &ref)

Expand All @@ -664,7 +681,7 @@ func (s *state) createFlow(ref flow) error {
sock.flows = make(map[string]*flow, 1)
}
sock.flows[ref.remote.addr.String()] = ptr
s.lru.add(ptr)
s.flowLRU.add(ptr)
s.numFlows++
return nil
}
Expand All @@ -673,10 +690,16 @@ func (s *state) createFlow(ref flow) error {
func (s *state) OnSockDestroyed(ptr uintptr, pid uint32) error {
s.Lock()
defer s.Unlock()

return s.onSockDestroyed(ptr, pid)
}

func (s *state) onSockDestroyed(ptr uintptr, pid uint32) error {
sock, found := s.socks[ptr]
if !found {
return nil
}

// Enrich with pid
if sock.pid == 0 && pid != 0 {
sock.pid = pid
Expand All @@ -689,6 +712,7 @@ func (s *state) OnSockDestroyed(ptr uintptr, pid uint32) error {
if !sock.closing {
sock.closeTime = time.Now()
sock.closing = true
s.socketLRU.remove(sock)
s.closing.add(sock)
}
return nil
Expand Down Expand Up @@ -721,8 +745,8 @@ func (s *state) UpdateFlowWithCondition(ref flow, cond func(*flow) bool) error {
s.mutualEnrich(sock, &ref)
prev.updateWith(ref, s)
s.enrichDNS(prev)
s.lru.remove(prev)
s.lru.add(prev)
s.flowLRU.remove(prev)
s.flowLRU.add(prev)
return nil
}

Expand Down Expand Up @@ -770,7 +794,7 @@ func (f *flow) updateWith(ref flow, s *state) {
}

func (s *state) onFlowTerminated(f *flow) {
s.lru.remove(f)
s.flowLRU.remove(f)
// Unbind this flow from its parent
if parent, found := s.socks[f.sock]; found {
delete(parent.flows, f.remote.addr.String())
Expand Down
Loading

0 comments on commit ff68af6

Please sign in to comment.