Skip to content

Commit

Permalink
Use heap for removal queue
Browse files Browse the repository at this point in the history
  • Loading branch information
mjwolf committed Apr 25, 2024
1 parent d074b3e commit bcb9a69
Showing 1 changed file with 71 additions and 28 deletions.
99 changes: 71 additions & 28 deletions x-pack/auditbeat/processors/sessionmd/processdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package processdb

import (
"container/heap"
"encoding/base64"
"errors"
"fmt"
Expand All @@ -26,11 +27,6 @@ import (
"github.com/elastic/elastic-agent-libs/logp"
)

const (
reaperInterval = 15 * time.Second // run the reaper process at this interval
removalTime = 10 * time.Second // remove processes that have been exited longer than this
)

type TTYType int

const (
Expand Down Expand Up @@ -192,7 +188,7 @@ type DB struct {
entryLeaderRelationships map[uint32]uint32
procfs procfs.Reader
stopChan chan struct{}
removalCandidates map[uint32]removalCandidate
removalCandidates rcHeap
}

func NewDB(reader procfs.Reader, logger logp.Logger) (*DB, error) {
Expand All @@ -206,8 +202,8 @@ func NewDB(reader procfs.Reader, logger logp.Logger) (*DB, error) {
entryLeaders: make(map[uint32]EntryType),
entryLeaderRelationships: make(map[uint32]uint32),
procfs: reader,
stopChan: make(chan struct{}),
removalCandidates: make(map[uint32]removalCandidate),
stopChan: make(chan struct{}),
removalCandidates: make(rcHeap, 0),
}
db.startReaper()
return &db, nil
Expand Down Expand Up @@ -425,10 +421,11 @@ func (db *DB) InsertExit(exit types.ProcessExitEvent) {
}
process.ExitCode = exit.ExitCode
db.processes[pid] = process
db.removalCandidates[pid] = removalCandidate{
heap.Push(&db.removalCandidates, removalCandidate{
pid: pid,
startTime: process.PIDs.StartTimeNS,
exitTime: time.Now(),
}
exitTime: time.Now(),
})
}

func interactiveFromTTY(tty types.TTYDev) bool {
Expand Down Expand Up @@ -742,11 +739,43 @@ func (db *DB) Close() {
close(db.stopChan)
}

const (
reaperInterval = 10 * time.Second // run the reaper process at this interval
removalTime = 5 * time.Second // remove processes that have been exited longer than this
)

type removalCandidate struct {
pid uint32
exitTime time.Time
startTime uint64
}

type rcHeap []removalCandidate

func (h rcHeap) Len() int {
return len(h)
}

func (h rcHeap) Less(i, j int) bool {
return h[i].exitTime.Sub(h[j].exitTime) < 0
}

func (h rcHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}

func (h *rcHeap) Push(x any) {
*h = append(*h, x.(removalCandidate))
}

func (h *rcHeap) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

// The reaper will remove exited processes from the DB a short time after they have exited.
// Processes cannot be removed immediately when exiting, as the event enrichment will happen sometime
// afterwards, and will fail if the process is already removed from the DB.
Expand All @@ -755,38 +784,52 @@ type removalCandidate struct {
// it cannot have a relation with any other longer-lived processes. If this processor is ported to other OSs, this
// assumption will need to be revisited.
func (db *DB) startReaper() {
ticker := time.NewTicker(reaperInterval)
defer ticker.Stop()
now := time.Now()
go func(db *DB) {
ticker := time.NewTicker(reaperInterval)
defer ticker.Stop()

go func() {
h := &db.removalCandidates
heap.Init(h)
for {
select {
case <-ticker.C:
db.mutex.Lock()
for pid, c := range db.removalCandidates {
p, ok := db.processes[pid]
now := time.Now()
for {
if len(db.removalCandidates) == 0 {
break
}
v := heap.Pop(h)
c, ok := v.(removalCandidate)
if !ok {
db.logger.Debugf("pid %v was candidate for removal, but was already removed", pid)
delete(db.removalCandidates, pid)
db.logger.Errorf("unexpected item in removal queue: \"%v\"", v)
continue
}
if p.PIDs.StartTimeNS != c.startTime {
db.logger.Debugf("start times of removal candidate %v differs, not removing (PID had been reused?)", pid)
delete(db.removalCandidates, pid)
if now.Sub(c.exitTime) < removalTime {
// this candidate hasn't reached its timeout, put it back on the heap
// everything else will have a later exit time, so end this run
heap.Push(h, c)
break
}
p, ok := db.processes[c.pid]
if !ok {
db.logger.Errorf("pid %v was candidate for removal, but was already removed", c.pid)
continue
}
if now.Sub(c.exitTime) > removalTime {
delete(db.processes, pid)
delete(db.entryLeaders, pid)
delete(db.entryLeaderRelationships, pid)
delete(db.removalCandidates, pid)
if p.PIDs.StartTimeNS != c.startTime {
// this could happen if the PID has already rolled over and reached this PID again.
db.logger.Debugf("start times of removal candidate %v differs, not removing (PID had been reused?)", c.pid)
continue
}
db.logger.Errorf("Removing pid %v", c.pid)
delete(db.processes, c.pid)
delete(db.entryLeaders, c.pid)
delete(db.entryLeaderRelationships, c.pid)
}
db.mutex.Unlock()
case <-db.stopChan:
return
}
}
}()
}(db)
}

0 comments on commit bcb9a69

Please sign in to comment.