Skip to content

Commit

Permalink
Procstat: don't cache PIDs
Browse files Browse the repository at this point in the history
Changed the procstat input plugin to not cache PIDs. Solves influxdata#1636.
The logic of creating a process by pid was moved from `procstat.go` to
`spec_processor.go`.
  • Loading branch information
jarondl committed Dec 28, 2016
1 parent fd1feff commit b381083
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 56 deletions.
44 changes: 7 additions & 37 deletions plugins/inputs/procstat/procstat.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"strconv"
"strings"

"github.com/shirou/gopsutil/process"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
Expand All @@ -23,15 +21,12 @@ type Procstat struct {
User string
PidTag bool

// pidmap maps a pid to a process object, so we don't recreate every gather
pidmap map[int32]*process.Process
// tagmap maps a pid to a map of tags for that pid
tagmap map[int32]map[string]string
}

func NewProcstat() *Procstat {
return &Procstat{
pidmap: make(map[int32]*process.Process),
tagmap: make(map[int32]map[string]string),
}
}
Expand Down Expand Up @@ -67,49 +62,24 @@ func (_ *Procstat) Description() string {
}

func (p *Procstat) Gather(acc telegraf.Accumulator) error {
err := p.createProcesses()
pids, err := p.getAllPids()
if err != nil {
log.Printf("E! Error: procstat getting process, exe: [%s] pidfile: [%s] pattern: [%s] user: [%s] %s",
p.Exe, p.PidFile, p.Pattern, p.User, err.Error())
} else {
for pid, proc := range p.pidmap {
for _, pid := range pids {
if p.PidTag {
p.tagmap[pid]["pid"] = fmt.Sprint(pid)
}
p := NewSpecProcessor(p.ProcessName, p.Prefix, pid, acc, proc, p.tagmap[pid])
p.pushMetrics()
}
}

return nil
}

func (p *Procstat) createProcesses() error {
var errstring string
var outerr error

pids, err := p.getAllPids()
if err != nil {
errstring += err.Error() + " "
}

for _, pid := range pids {
_, ok := p.pidmap[pid]
if !ok {
proc, err := process.NewProcess(pid)
if err == nil {
p.pidmap[pid] = proc
} else {
errstring += err.Error() + " "
p := NewSpecProcessor(p.ProcessName, p.Prefix, pid, acc, p.tagmap[pid])
err := p.pushMetrics()
if err != nil {
log.Printf("E! Error: procstat: %s", err.Error())
}
}
}

if errstring != "" {
outerr = fmt.Errorf("%s", errstring)
}

return outerr
return nil
}

func (p *Procstat) getAllPids() ([]int32, error) {
Expand Down
2 changes: 0 additions & 2 deletions plugins/inputs/procstat/procstat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"strconv"
"testing"

"github.com/shirou/gopsutil/process"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -24,7 +23,6 @@ func TestGather(t *testing.T) {
p := Procstat{
PidFile: file.Name(),
Prefix: "foo",
pidmap: make(map[int32]*process.Process),
tagmap: make(map[int32]map[string]string),
}
p.Gather(&acc)
Expand Down
43 changes: 26 additions & 17 deletions plugins/inputs/procstat/spec_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,57 @@ package procstat

import (
"time"
"fmt"

"github.com/shirou/gopsutil/process"

"github.com/influxdata/telegraf"
)

type SpecProcessor struct {
Prefix string
pid int32
tags map[string]string
fields map[string]interface{}
acc telegraf.Accumulator
proc *process.Process
ProcessName string
Prefix string
pid int32
tags map[string]string
fields map[string]interface{}
acc telegraf.Accumulator
proc *process.Process
}

func NewSpecProcessor(
processName string,
prefix string,
pid int32,
acc telegraf.Accumulator,
p *process.Process,
tags map[string]string,
) *SpecProcessor {
if processName != "" {
tags["process_name"] = processName
} else {
name, err := p.Name()
if err == nil {
tags["process_name"] = name
}
}
return &SpecProcessor{
ProcessName: processName,
Prefix: prefix,
pid: pid,
tags: tags,
fields: make(map[string]interface{}),
acc: acc,
proc: p,
}
}

func (p *SpecProcessor) pushMetrics() {
func (p *SpecProcessor) pushMetrics() (error) {
var prefix string
proc, err := process.NewProcess(p.pid)
if err != nil {
return fmt.Errorf("Failed to open process with pid '%d'. Error: '%s'",
p.pid, err)
}
p.proc = proc
if p.ProcessName != "" {
p.tags["process_name"] = p.ProcessName
} else {
name, err := p.proc.Name()
if err == nil {
p.tags["process_name"] = name
}
}

if p.Prefix != "" {
prefix = p.Prefix + "_"
}
Expand Down Expand Up @@ -107,4 +115,5 @@ func (p *SpecProcessor) pushMetrics() {
}

p.acc.AddFields("procstat", fields, p.tags)
return nil
}

0 comments on commit b381083

Please sign in to comment.