Skip to content

Commit

Permalink
usm: watcher: Add periodic scan (#32400)
Browse files Browse the repository at this point in the history
 Conflicts:
	pkg/network/usm/sharedlibraries/watcher.go
	pkg/network/usm/sharedlibraries/watcher_test.go

Co-authored-by: Guy Arbitman <[email protected]>
  • Loading branch information
vitkyrka and guyarb committed Dec 27, 2024
1 parent ed59a4d commit f3c92d6
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 10 deletions.
80 changes: 70 additions & 10 deletions pkg/network/usm/sharedlibraries/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import (
"github.com/DataDog/datadog-agent/pkg/util/log"
)

const (
var (
// The interval of the periodic scan for terminated processes. Increasing the interval, might cause larger spikes in cpu
// and lowering it might cause constant cpu usage.
// and lowering it might cause constant cpu usage. This is a var instead of a const only because the test code changes
// this value to speed up test execution.
scanTerminatedProcessesInterval = 30 * time.Second
)

Expand All @@ -51,6 +52,7 @@ type Rule struct {

// Watcher provides a way to tie callback functions to the lifecycle of shared libraries
type Watcher struct {
syncMutex sync.RWMutex
wg sync.WaitGroup
done chan struct{}
procRoot string
Expand All @@ -59,6 +61,8 @@ type Watcher struct {
processMonitor *monitor.ProcessMonitor
registry *utils.FileRegistry
ebpfProgram *EbpfProgram
thisPID int
scannedPIDs map[uint32]int

// telemetry
libHits *telemetry.Counter
Expand All @@ -85,6 +89,7 @@ func NewWatcher(cfg *config.Config, rules ...Rule) (*Watcher, error) {
processMonitor: monitor.GetProcessMonitor(),
ebpfProgram: ebpfProgram,
registry: utils.NewFileRegistry("shared_libraries"),
scannedPIDs: make(map[uint32]int),

libHits: telemetry.NewCounter("usm.so_watcher.hits", telemetry.OptPrometheus),
libMatches: telemetry.NewCounter("usm.so_watcher.matches", telemetry.OptPrometheus),
Expand Down Expand Up @@ -188,13 +193,14 @@ func (w *Watcher) Start() {
return
}

thisPID, err := kernel.RootNSPID()
var err error
w.thisPID, err = kernel.RootNSPID()
if err != nil {
log.Warnf("Watcher Start can't get root namespace pid %s", err)
}

_ = kernel.WithAllProcs(w.procRoot, func(pid int) error {
if pid == thisPID { // don't scan ourself
if pid == w.thisPID { // don't scan ourself
return nil
}

Expand Down Expand Up @@ -247,18 +253,14 @@ func (w *Watcher) Start() {
case <-w.done:
return
case <-processSync.C:
processSet := w.registry.GetRegisteredProcesses()
deletedPids := monitor.FindDeletedProcesses(processSet)
for deletedPid := range deletedPids {
_ = w.registry.Unregister(deletedPid)
}
w.sync()
case event, ok := <-dataChannel:
if !ok {
return
}

lib := ToLibPath(event.Data)
if int(lib.Pid) == thisPID {
if int(lib.Pid) == w.thisPID {
// don't scan ourself
event.Done()
continue
Expand Down Expand Up @@ -288,3 +290,61 @@ func (w *Watcher) Start() {

utils.AddAttacher("native", w)
}

// sync unregisters from any terminated processes which we missed the exit
// callback for, and also attempts to register to running processes to ensure
// that we don't miss any process.
func (w *Watcher) sync() {
// The mutex is only used for protection with the test code which reads the
// scannedPIDs map.
w.syncMutex.Lock()
defer w.syncMutex.Unlock()

deletionCandidates := w.registry.GetRegisteredProcesses()
alivePIDs := make(map[uint32]struct{})

_ = kernel.WithAllProcs(kernel.ProcFSRoot(), func(origPid int) error {
if origPid == w.thisPID { // don't scan ourselves
return nil
}

pid := uint32(origPid)
alivePIDs[pid] = struct{}{}

if _, ok := deletionCandidates[pid]; ok {
// We have previously hooked into this process and it remains
// active, so we remove it from the deletionCandidates list, and
// move on to the next PID
delete(deletionCandidates, pid)
return nil
}

scanned := w.scannedPIDs[pid]

// Try to scan twice. This is because we may happen to scan the process
// just after it has been exec'd and before it has opened its shared
// libraries. Scanning twice with the sync interval reduce this risk of
// missing shared libraries due to this.
if scanned < 2 {
w.scannedPIDs[pid]++
err := w.AttachPID(pid)
if err == nil {
log.Debugf("watcher attached to %v via periodic scan", pid)
w.scannedPIDs[pid] = 2
}
}

return nil
})

// Clean up dead processes from the list of scanned PIDs
for pid := range w.scannedPIDs {
if _, alive := alivePIDs[pid]; !alive {
delete(w.scannedPIDs, pid)
}
}

for pid := range deletionCandidates {
_ = w.registry.Unregister(pid)
}
}
94 changes: 94 additions & 0 deletions pkg/network/usm/sharedlibraries/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,100 @@ func (s *SharedLibrarySuite) TestSharedLibraryDetection() {
}, time.Second*10, 100*time.Millisecond)
}

// Tests that the periodic scan is able to detect processes which are missed by
// the eBPF-based watcher.
func (s *SharedLibrarySuite) TestSharedLibraryDetectionPeriodic() {
t := s.T()

// Construct a large path to exceed the limits of the eBPF-based watcher
// (LIB_PATH_MAX_SIZE). 255 is the max filename size of ext4. The path
// size will also include the directories leading up to this filename so the
// total size will be more.
var b strings.Builder
final := "foo-libssl.so"
for i := 0; i < 255-len(final); i++ {
b.WriteByte('x')
}
b.WriteString(final)
filename := b.String()

// Reduce interval to speed up test
orig := scanTerminatedProcessesInterval
t.Cleanup(func() { scanTerminatedProcessesInterval = orig })
scanTerminatedProcessesInterval = 10 * time.Millisecond

fooPath1, fooPathID1 := createTempTestFile(t, filename)
errPath, errorPathID := createTempTestFile(t, strings.Replace(filename, "xfoo", "yfoo", 1))

registerRecorder := new(utils.CallbackRecorder)
unregisterRecorder := new(utils.CallbackRecorder)

registerCallback := registerRecorder.Callback()

watcher, err := NewWatcher(config.New(),
Rule{
Re: regexp.MustCompile(`foo-libssl.so`),
RegisterCB: func(fp utils.FilePath) error {
registerCallback(fp)
if fp.ID == errorPathID {
return utils.ErrEnvironment
}
return nil
},
UnregisterCB: unregisterRecorder.Callback(),
},
)
require.NoError(t, err)
watcher.Start()
t.Cleanup(watcher.Stop)

// create files
command1, err := fileopener.OpenFromAnotherProcess(t, fooPath1)
pid := command1.Process.Pid
require.NoError(t, err)

command2, err := fileopener.OpenFromAnotherProcess(t, errPath)
pid2 := command2.Process.Pid
require.NoError(t, err)

require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Equal(c, registerRecorder.CallsForPathID(fooPathID1), 1)

// Check that we tried to attach to the process twice. See w.sync() for
// why we do it. We don't actually need to attempt the registration
// twice, we just need to ensure that the maps were scanned twice but we
// don't have a hook for that so this check should be good enough.
assert.Equal(c, registerRecorder.CallsForPathID(errorPathID), 2)
}, time.Second*10, 100*time.Millisecond, "")

require.EventuallyWithT(t, func(c *assert.CollectT) {
watcher.syncMutex.Lock()
defer watcher.syncMutex.Unlock()

assert.Contains(c, watcher.scannedPIDs, uint32(pid))
assert.Contains(c, watcher.scannedPIDs, uint32(pid2))
}, time.Second*10, 100*time.Millisecond)

require.NoError(t, command1.Process.Kill())
require.NoError(t, command2.Process.Kill())

command1.Process.Wait()
command2.Process.Wait()

require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Equal(c, unregisterRecorder.CallsForPathID(fooPathID1), 1)
}, time.Second*10, 100*time.Millisecond)

// Check that clean up of dead processes works.
require.EventuallyWithT(t, func(c *assert.CollectT) {
watcher.syncMutex.Lock()
defer watcher.syncMutex.Unlock()

assert.NotContains(c, watcher.scannedPIDs, uint32(pid))
assert.NotContains(c, watcher.scannedPIDs, uint32(pid2))
}, time.Second*10, 100*time.Millisecond)
}

func (s *SharedLibrarySuite) TestSharedLibraryDetectionWithPIDAndRootNamespace() {
t := s.T()
_, err := os.Stat("/usr/bin/busybox")
Expand Down

0 comments on commit f3c92d6

Please sign in to comment.