From b381083f5a768afd5be17b97cbd973debdb7afc4 Mon Sep 17 00:00:00 2001 From: Yaron de Leeuw Date: Wed, 28 Dec 2016 16:04:31 -0500 Subject: [PATCH] Procstat: don't cache PIDs Changed the procstat input plugin to not cache PIDs. Solves #1636. The logic of creating a process by pid was moved from `procstat.go` to `spec_processor.go`. --- plugins/inputs/procstat/procstat.go | 44 ++++------------------- plugins/inputs/procstat/procstat_test.go | 2 -- plugins/inputs/procstat/spec_processor.go | 43 +++++++++++++--------- 3 files changed, 33 insertions(+), 56 deletions(-) diff --git a/plugins/inputs/procstat/procstat.go b/plugins/inputs/procstat/procstat.go index 929490e4a2e23..565d0ebd13b38 100644 --- a/plugins/inputs/procstat/procstat.go +++ b/plugins/inputs/procstat/procstat.go @@ -8,8 +8,6 @@ import ( "strconv" "strings" - "github.com/shirou/gopsutil/process" - "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -23,15 +21,12 @@ type Procstat struct { User string PidTag bool - // pidmap maps a pid to a process object, so we don't recreate every gather - pidmap map[int32]*process.Process // tagmap maps a pid to a map of tags for that pid tagmap map[int32]map[string]string } func NewProcstat() *Procstat { return &Procstat{ - pidmap: make(map[int32]*process.Process), tagmap: make(map[int32]map[string]string), } } @@ -67,49 +62,24 @@ func (_ *Procstat) Description() string { } func (p *Procstat) Gather(acc telegraf.Accumulator) error { - err := p.createProcesses() + pids, err := p.getAllPids() if err != nil { log.Printf("E! Error: procstat getting process, exe: [%s] pidfile: [%s] pattern: [%s] user: [%s] %s", p.Exe, p.PidFile, p.Pattern, p.User, err.Error()) } else { - for pid, proc := range p.pidmap { + for _, pid := range pids { if p.PidTag { p.tagmap[pid]["pid"] = fmt.Sprint(pid) } - p := NewSpecProcessor(p.ProcessName, p.Prefix, pid, acc, proc, p.tagmap[pid]) - p.pushMetrics() - } - } - - return nil -} - -func (p *Procstat) createProcesses() error { - var errstring string - var outerr error - - pids, err := p.getAllPids() - if err != nil { - errstring += err.Error() + " " - } - - for _, pid := range pids { - _, ok := p.pidmap[pid] - if !ok { - proc, err := process.NewProcess(pid) - if err == nil { - p.pidmap[pid] = proc - } else { - errstring += err.Error() + " " + p := NewSpecProcessor(p.ProcessName, p.Prefix, pid, acc, p.tagmap[pid]) + err := p.pushMetrics() + if err != nil { + log.Printf("E! Error: procstat: %s", err.Error()) } } } - if errstring != "" { - outerr = fmt.Errorf("%s", errstring) - } - - return outerr + return nil } func (p *Procstat) getAllPids() ([]int32, error) { diff --git a/plugins/inputs/procstat/procstat_test.go b/plugins/inputs/procstat/procstat_test.go index ccc72bdbb2811..001537178720e 100644 --- a/plugins/inputs/procstat/procstat_test.go +++ b/plugins/inputs/procstat/procstat_test.go @@ -6,7 +6,6 @@ import ( "strconv" "testing" - "github.com/shirou/gopsutil/process" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -24,7 +23,6 @@ func TestGather(t *testing.T) { p := Procstat{ PidFile: file.Name(), Prefix: "foo", - pidmap: make(map[int32]*process.Process), tagmap: make(map[int32]map[string]string), } p.Gather(&acc) diff --git a/plugins/inputs/procstat/spec_processor.go b/plugins/inputs/procstat/spec_processor.go index 3b56fbc3e212a..f56de48e63e01 100644 --- a/plugins/inputs/procstat/spec_processor.go +++ b/plugins/inputs/procstat/spec_processor.go @@ -2,6 +2,7 @@ package procstat import ( "time" + "fmt" "github.com/shirou/gopsutil/process" @@ -9,12 +10,13 @@ import ( ) type SpecProcessor struct { - Prefix string - pid int32 - tags map[string]string - fields map[string]interface{} - acc telegraf.Accumulator - proc *process.Process + ProcessName string + Prefix string + pid int32 + tags map[string]string + fields map[string]interface{} + acc telegraf.Accumulator + proc *process.Process } func NewSpecProcessor( @@ -22,29 +24,35 @@ func NewSpecProcessor( prefix string, pid int32, acc telegraf.Accumulator, - p *process.Process, tags map[string]string, ) *SpecProcessor { - if processName != "" { - tags["process_name"] = processName - } else { - name, err := p.Name() - if err == nil { - tags["process_name"] = name - } - } return &SpecProcessor{ + ProcessName: processName, Prefix: prefix, pid: pid, tags: tags, fields: make(map[string]interface{}), acc: acc, - proc: p, } } -func (p *SpecProcessor) pushMetrics() { +func (p *SpecProcessor) pushMetrics() (error) { var prefix string + proc, err := process.NewProcess(p.pid) + if err != nil { + return fmt.Errorf("Failed to open process with pid '%d'. Error: '%s'", + p.pid, err) + } + p.proc = proc + if p.ProcessName != "" { + p.tags["process_name"] = p.ProcessName + } else { + name, err := p.proc.Name() + if err == nil { + p.tags["process_name"] = name + } + } + if p.Prefix != "" { prefix = p.Prefix + "_" } @@ -107,4 +115,5 @@ func (p *SpecProcessor) pushMetrics() { } p.acc.AddFields("procstat", fields, p.tags) + return nil }