Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monitor process by pidfile or exe name #240

Closed
wants to merge 10 commits into from
1 change: 1 addition & 0 deletions plugins/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
_ "github.com/influxdb/telegraf/plugins/nginx"
_ "github.com/influxdb/telegraf/plugins/ping"
_ "github.com/influxdb/telegraf/plugins/postgresql"
_ "github.com/influxdb/telegraf/plugins/procstat"
_ "github.com/influxdb/telegraf/plugins/prometheus"
_ "github.com/influxdb/telegraf/plugins/rabbitmq"
_ "github.com/influxdb/telegraf/plugins/redis"
Expand Down
104 changes: 104 additions & 0 deletions plugins/procstat/procstat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package procstat

import (
"fmt"
"github.com/influxdb/telegraf/plugins"
"github.com/shirou/gopsutil/process"
"io/ioutil"
"os/exec"
"strconv"
"strings"
"sync"
)

type Specification struct {
PidFile string `toml:pid_file`
Exe string
Prefix string
}

type Procstat struct {
Specifications []*Specification
}

func NewProcstat() *Procstat {
return &Procstat{}
}

var sampleConfig = `
[[process.specifications]]
# pid file
pid_file = "/path/to/foo.pid"
# executable name (used by pgrep)
exe = "/path/to/foo"
name = "foo" # required
`

func (_ *Procstat) SampleConfig() string {
return sampleConfig
}

func (_ *Procstat) Description() string {
return "Monitor process cpu and memory usage"
}

func (p *Procstat) Gather(acc plugins.Accumulator) error {
var wg sync.WaitGroup
var outerr error
for _, specification := range p.Specifications {
wg.Add(1)
go func(spec *Specification, acc plugins.Accumulator) {
defer wg.Done()
proc, err := spec.createProcess()
if err != nil {
outerr = err
} else {
outerr = NewSpecProcessor(spec.Prefix, acc, proc).pushMetrics()
}
}(specification, acc)
}
wg.Wait()
return outerr
}

func (spec *Specification) createProcess() (*process.Process, error) {
if spec.PidFile != "" {
pid, err := pidFromFile(spec.PidFile)
if err != nil {
return nil, err
}
return process.NewProcess(int32(pid))
} else if spec.Exe != "" {
pid, err := pidFromExe(spec.Exe)
if err != nil {
return nil, err
}
return process.NewProcess(int32(pid))
} else {
return nil, fmt.Errorf("Either exe or pid_file has to be specified")
}
}

func pidFromFile(file string) (int, error) {
pidString, err := ioutil.ReadFile(file)
if err != nil {
return -1, fmt.Errorf("Failed to read pidfile '%s'. Error: '%s'", file, err)
} else {
return strconv.Atoi(strings.TrimSpace(string(pidString)))
}
}

func pidFromExe(exe string) (int, error) {
pidString, err := exec.Command("pgrep", exe).Output()
if err != nil {
return -1, fmt.Errorf("Failed to execute pgrep. Error: '%s'", err)
} else {
return strconv.Atoi(strings.TrimSpace(string(pidString)))
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering how this would behave should pgrep return multiple PIDs. I tend to run multiple Python interpreters on our servers, and setting "python" as the executable makes pgrep return the PID for each one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arnimarj it will error out in that case. an strconv.Atoi call. this is as expected. i'll prefer pid file or master process' name for such scenarios.


func init() {
plugins.Add("process", func() plugins.Plugin {
return NewProcstat()
})
}
28 changes: 28 additions & 0 deletions plugins/procstat/procstat_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package procstat

import (
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"io/ioutil"
"os"
"strconv"
"testing"
)

func TestGather(t *testing.T) {
var acc testutil.Accumulator
pid := os.Getpid()
file, err := ioutil.TempFile(os.TempDir(), "telegraf")
require.NoError(t, err)
file.Write([]byte(strconv.Itoa(pid)))
file.Close()
defer os.Remove(file.Name())
specifications := []*Specification{&Specification{PidFile: file.Name(), Prefix: "foo"}}
p := Procstat{
Specifications: specifications,
}
p.Gather(&acc)
assert.True(t, acc.HasFloatValue("foo_cpu_user"))
assert.True(t, acc.HasUIntValue("foo_memory_vms"))
}
107 changes: 107 additions & 0 deletions plugins/procstat/spec_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package procstat

import (
"fmt"
"github.com/influxdb/telegraf/plugins"
"github.com/shirou/gopsutil/process"
)

type SpecProcessor struct {
Prefix string
tags map[string]string
acc plugins.Accumulator
proc *process.Process
}

func (p *SpecProcessor) add(metric string, value interface{}) {
p.acc.Add(p.Prefix+"_"+metric, value, p.tags)
}

func NewSpecProcessor(prefix string, acc plugins.Accumulator, p *process.Process) *SpecProcessor {
return &SpecProcessor{
Prefix: prefix,
tags: map[string]string{},
acc: acc,
proc: p,
}
}

func (p *SpecProcessor) pushMetrics() error {
if err := p.pushFDStats(); err != nil {
return err
}
if err := p.pushCtxStats(); err != nil {
return err
}
if err := p.pushIOStats(); err != nil {
return err
}
if err := p.pushCPUStats(); err != nil {
return err
}
if err := p.pushMemoryStats(); err != nil {
return err
}
return nil
}

func (p *SpecProcessor) pushFDStats() error {
fds, err := p.proc.NumFDs()
if err != nil {
return fmt.Errorf("NumFD error: %s\n", err)
}
p.add("num_fds", fds)
return nil
}

func (p *SpecProcessor) pushCtxStats() error {
ctx, err := p.proc.NumCtxSwitches()
if err != nil {
return fmt.Errorf("ContextSwitch error: %s\n", err)
}
p.add("voluntary_context_switches", ctx.Voluntary)
p.add("involuntary_context_switches", ctx.Involuntary)
return nil
}

func (p *SpecProcessor) pushIOStats() error {
io, err := p.proc.IOCounters()
if err != nil {
return fmt.Errorf("IOCounters error: %s\n", err)
}
p.add("read_count", io.ReadCount)
p.add("write_count", io.WriteCount)
p.add("read_bytes", io.ReadBytes)
p.add("write_bytes", io.WriteCount)
return nil
}

func (p *SpecProcessor) pushCPUStats() error {
cpu, err := p.proc.CPUTimes()
if err != nil {
return err
}
p.add("cpu_user", cpu.User)
p.add("cpu_system", cpu.System)
p.add("cpu_idle", cpu.Idle)
p.add("cpu_nice", cpu.Nice)
p.add("cpu_iowait", cpu.Iowait)
p.add("cpu_irq", cpu.Irq)
p.add("cpu_soft_irq", cpu.Softirq)
p.add("cpu_soft_steal", cpu.Steal)
p.add("cpu_soft_stolen", cpu.Stolen)
p.add("cpu_soft_guest", cpu.Guest)
p.add("cpu_soft_guest_nice", cpu.GuestNice)
return nil
}

func (p *SpecProcessor) pushMemoryStats() error {
mem, err := p.proc.MemoryInfo()
if err != nil {
return err
}
p.add("memory_rss", mem.RSS)
p.add("memory_vms", mem.VMS)
p.add("memory_swap", mem.Swap)
return nil
}