Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Auditbeat] Memory leak and syscall fix #17500

Merged
merged 6 commits into from
Apr 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Use max in k8s overview dashboard aggregations. {pull}17015[17015]
- Fix Disk Used and Disk Usage visualizations in the Metricbeat System dashboards. {issue}12435[12435] {pull}17272[17272]
- Fix missing Accept header for Prometheus and OpenMetrics module. {issue}16870[16870] {pull}17291[17291]
- Further revise check for bad data in docker/memory. {pull}17400[17400]
- Further revise check for bad data in docker/memory. {pull}17400[17400]
- Fix issue in Jolokia module when mbean contains multiple quoted properties. {issue}17375[17375] {pull}17374[17374]
- Combine cloudwatch aggregated metrics into single event. {pull}17345[17345]
- Fix how we filter services by name in system/service {pull}17400[17400]
Expand Down Expand Up @@ -183,6 +183,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
*Auditbeat*

- Log to stderr when running using reference kubernetes manifests. {pull}17443[174443]
- Fix syscall kprobe arguments for 32-bit systems in socket module. {pull}17500[17500]
- Fix memory leak on when we miss socket close kprobe events. {pull}17500[17500]

*Filebeat*

Expand Down
6 changes: 6 additions & 0 deletions x-pack/auditbeat/module/system/socket/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ Determines how long to wait after a socket has been closed for out of order
packets. With TCP, some packets can be received shortly after a socket is
closed. If set too low, additional flows will be generated for those packets.

- `socket.socket_inactive_timeout` (default: 1m)

How long a socket can be inactive to be evicted from the internal cache.
A lower value reduces memory usage at the expense of some flows being
reported as multiple partial flows.

- `socket.perf_queue_size` (default: 4096)

The number of tracing samples that can be queued for processing. A larger value
Expand Down
12 changes: 6 additions & 6 deletions x-pack/auditbeat/module/system/socket/arch_386.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ var archVariables = common.MapStr{
"RET": "%ax",

// System call parameters
"SYS_P1": "$stack1",
"SYS_P2": "$stack2",
"SYS_P3": "$stack3",
"SYS_P4": "$stack4",
"SYS_P5": "$stack5",
"SYS_P6": "$stack6",
"_SYS_P1": "$stack1",
"_SYS_P2": "$stack2",
"_SYS_P3": "$stack3",
"_SYS_P4": "$stack4",
"_SYS_P5": "$stack5",
"_SYS_P6": "$stack6",
}
5 changes: 5 additions & 0 deletions x-pack/auditbeat/module/system/socket/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ type Config struct {
// considered closed.
FlowInactiveTimeout time.Duration `config:"socket.flow_inactive_timeout"`

// SocketInactiveTimeout determines how long a socket has to be inactive to be
// considered terminated or closed.
SocketInactiveTimeout time.Duration `config:"socket.socket_inactive_timeout"`

// FlowTerminationTimeout determines how long to wait after a flow has been
// closed for out of order packets. With TCP, some packets can be received
// shortly after a socket is closed. If set too low, additional flows will
Expand Down Expand Up @@ -71,6 +75,7 @@ var defaultConfig = Config{
ErrQueueSize: 1,
RingSizeExp: 7,
FlowInactiveTimeout: 30 * time.Second,
SocketInactiveTimeout: 60 * time.Second,
FlowTerminationTimeout: 5 * time.Second,
ClockMaxDrift: 100 * time.Millisecond,
ClockSyncPeriod: 10 * time.Second,
Expand Down
1 change: 1 addition & 0 deletions x-pack/auditbeat/module/system/socket/socket_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func (m *MetricSet) Run(r mb.PushReporterV2) {
st := NewState(r,
m.log,
m.config.FlowInactiveTimeout,
m.config.SocketInactiveTimeout,
m.config.FlowTerminationTimeout,
m.config.ClockMaxDrift)

Expand Down
76 changes: 50 additions & 26 deletions x-pack/auditbeat/module/system/socket/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ type socket struct {
closing bool
closeTime time.Time
prev, next linkedElement

createdTime, lastSeenTime time.Time
}

// Prev returns the previous socket in the linked list.
Expand Down Expand Up @@ -353,11 +355,14 @@ type state struct {
numFlows uint64

// configuration
inactiveTimeout, closeTimeout time.Duration
clockMaxDrift time.Duration
inactiveTimeout, closeTimeout, socketTimeout time.Duration
clockMaxDrift time.Duration

// lru used for flow expiration.
lru linkedList
flowLRU linkedList

// lru used for socket expiration.
socketLRU linkedList

// holds closed and expired flows.
done linkedList
Expand All @@ -373,10 +378,14 @@ func (s *state) getSocket(sock uintptr) *socket {
if socket, found := s.socks[sock]; found {
return socket
}
now := time.Now()
socket := &socket{
sock: sock,
sock: sock,
createdTime: now,
lastSeenTime: now,
}
s.socks[sock] = socket
s.socketLRU.add(socket)
return socket
}

Expand All @@ -385,20 +394,21 @@ var kernelProcess = process{
name: "[kernel_task]",
}

func NewState(r mb.PushReporterV2, log helper.Logger, inactiveTimeout, closeTimeout, clockMaxDrift time.Duration) *state {
s := makeState(r, log, inactiveTimeout, closeTimeout, clockMaxDrift)
func NewState(r mb.PushReporterV2, log helper.Logger, inactiveTimeout, socketTimeout, closeTimeout, clockMaxDrift time.Duration) *state {
s := makeState(r, log, inactiveTimeout, socketTimeout, closeTimeout, clockMaxDrift)
go s.reapLoop()
return s
}

func makeState(r mb.PushReporterV2, log helper.Logger, inactiveTimeout, closeTimeout, clockMaxDrift time.Duration) *state {
func makeState(r mb.PushReporterV2, log helper.Logger, inactiveTimeout, socketTimeout, closeTimeout, clockMaxDrift time.Duration) *state {
return &state{
reporter: r,
log: log,
processes: make(map[uint32]*process),
socks: make(map[uintptr]*socket),
threads: make(map[uint32]event),
inactiveTimeout: inactiveTimeout,
socketTimeout: socketTimeout,
closeTimeout: closeTimeout,
clockMaxDrift: clockMaxDrift,
dns: newDNSTracker(inactiveTimeout * 2),
Expand All @@ -422,7 +432,7 @@ func (s *state) logState() {
numSocks := len(s.socks)
numProcs := len(s.processes)
numThreads := len(s.threads)
lruSize := s.lru.size
flowLRUSize := s.flowLRU.size
doneSize := s.done.size
closingSize := s.closing.size
events := atomic.LoadUint64(&eventCount)
Expand All @@ -434,11 +444,11 @@ func (s *state) logState() {
lastEvents = events
lastTime = now
var errs []string
if uint64(lruSize) != numFlows {
if uint64(flowLRUSize) != numFlows {
errs = append(errs, "flow count mismatch")
}
msg := fmt.Sprintf("state flows=%d sockets=%d procs=%d threads=%d lru=%d done=%d closing=%d events=%d eps=%.1f",
numFlows, numSocks, numProcs, numThreads, lruSize, doneSize, closingSize, events,
numFlows, numSocks, numProcs, numThreads, flowLRUSize, doneSize, closingSize, events,
float64(newEvs)*float64(time.Second)/float64(took))
if errs == nil {
s.log.Debugf("%s", msg)
Expand Down Expand Up @@ -489,13 +499,23 @@ func (s *state) ExpireOlder() {
s.Lock()
defer s.Unlock()
deadline := time.Now().Add(-s.inactiveTimeout)
for item := s.lru.peek(); item != nil && item.Timestamp().Before(deadline); {
for item := s.flowLRU.peek(); item != nil && item.Timestamp().Before(deadline); {
if flow, ok := item.(*flow); ok {
s.onFlowTerminated(flow)
} else {
s.lru.get()
s.flowLRU.get()
}
item = s.lru.peek()
item = s.flowLRU.peek()
}

deadline = time.Now().Add(-s.socketTimeout)
for item := s.socketLRU.peek(); item != nil && item.Timestamp().Before(deadline); {
if sock, ok := item.(*socket); ok {
s.onSockDestroyed(sock.sock, 0)
} else {
s.socketLRU.get()
}
item = s.socketLRU.peek()
}

deadline = time.Now().Add(-s.closeTimeout)
Expand Down Expand Up @@ -638,19 +658,16 @@ func (s *state) mutualEnrich(sock *socket, f *flow) {
sock.process = s.getProcess(sock.pid)
f.process = sock.process
}
if !sock.closing {
sock.lastSeenTime = time.Now()
s.socketLRU.remove(sock)
s.socketLRU.add(sock)
}
}

func (s *state) createFlow(ref flow) error {
// Get or create a socket for this flow
sock, found := s.socks[ref.sock]
if !found {
sock = &socket{
sock: ref.sock,
}
s.socks[ref.sock] = sock

}

sock := s.getSocket(ref.sock)
ref.createdTime = ref.lastSeenTime
s.mutualEnrich(sock, &ref)

Expand All @@ -664,7 +681,7 @@ func (s *state) createFlow(ref flow) error {
sock.flows = make(map[string]*flow, 1)
}
sock.flows[ref.remote.addr.String()] = ptr
s.lru.add(ptr)
s.flowLRU.add(ptr)
s.numFlows++
return nil
}
Expand All @@ -673,10 +690,16 @@ func (s *state) createFlow(ref flow) error {
func (s *state) OnSockDestroyed(ptr uintptr, pid uint32) error {
s.Lock()
defer s.Unlock()

return s.onSockDestroyed(ptr, pid)
}

func (s *state) onSockDestroyed(ptr uintptr, pid uint32) error {
sock, found := s.socks[ptr]
if !found {
return nil
}

// Enrich with pid
if sock.pid == 0 && pid != 0 {
sock.pid = pid
Expand All @@ -689,6 +712,7 @@ func (s *state) OnSockDestroyed(ptr uintptr, pid uint32) error {
if !sock.closing {
sock.closeTime = time.Now()
sock.closing = true
s.socketLRU.remove(sock)
s.closing.add(sock)
}
return nil
Expand Down Expand Up @@ -721,8 +745,8 @@ func (s *state) UpdateFlowWithCondition(ref flow, cond func(*flow) bool) error {
s.mutualEnrich(sock, &ref)
prev.updateWith(ref, s)
s.enrichDNS(prev)
s.lru.remove(prev)
s.lru.add(prev)
s.flowLRU.remove(prev)
s.flowLRU.add(prev)
return nil
}

Expand Down Expand Up @@ -770,7 +794,7 @@ func (f *flow) updateWith(ref flow, s *state) {
}

func (s *state) onFlowTerminated(f *flow) {
s.lru.remove(f)
s.flowLRU.remove(f)
// Unbind this flow from its parent
if parent, found := s.socks[f.sock]; found {
delete(parent.flows, f.remote.addr.String())
Expand Down
Loading